- 准备工作:安装和配置 ActiveMQ。
- STOMP 协议简介:了解我们将使用的通信语言。
- C/C++ 客户端开发:
- 使用 libcurl 库(推荐,简单易用)
- 使用 POSIX Sockets(更底层,有助于理解原理)
- 完整代码示例:生产者和消费者。
- 总结与进阶。
准备工作:安装和运行 ActiveMQ
在开始之前,你需要一个正在运行的 ActiveMQ 服务器。

(图片来源网络,侵删)
-
下载 ActiveMQ:
- 访问 Apache ActiveMQ 官网。
- 下载最新的二进制发行版(
apache-activemq-5.18.3-bin.zip)。
-
启动 ActiveMQ:
- 解压下载的文件。
- 进入
bin目录(在 Windows 下是bin\win64,在 Linux/macOS 下是bin)。 - 运行启动脚本:
- Windows:
activemq.bat - Linux/macOS:
./activemq start
- Windows:
- 启动后,你可以通过浏览器访问管理控制台:
http://localhost:8161(默认用户名/密码是admin/admin)。
-
启用 STOMP 协议:
- ActiveMQ 默认已经启用了 STOMP 协议,端口为 61613,你可以在管理控制台的 "Connections" 标签页下看到
stomp连接器。
- ActiveMQ 默认已经启用了 STOMP 协议,端口为 61613,你可以在管理控制台的 "Connections" 标签页下看到
STOMP 协议简介
STOMP 是一个简单的协议,客户端和服务器之间通过发送帧来通信,每个帧由三部分组成:

(图片来源网络,侵删)
- 命令:如
CONNECT,SEND,SUBSCRIBE,UNSUBSCRIBE,DISCONNECT等。 - 头:键值对,提供关于帧的元信息,如
destination(队列/主题名),receipt(请求确认),content-type(消息类型)。 - 体:实际的消息内容,可以是文本或二进制数据。
发送一条消息到队列 TEST.QUEUE:
SEND
destination:/queue/TEST.QUEUE
content-type:text/plain
Hello from C Client!
一个空行 (\n\n) 分隔头和体。
C/C++ 客户端开发
我们将实现两个程序:一个生产者(发送消息)和一个消费者(接收消息)。
使用 libcurl 库(推荐)
libcurl 是一个强大的客户端 URL 传输库,支持多种协议,包括 HTTP 和 STOMP(通过原始 TCP 模式),这使得它成为连接 ActiveMQ 的绝佳选择。
步骤 1: 安装 libcurl
- Debian/Ubuntu:
sudo apt-get install libcurl4-openssl-dev - RedHat/CentOS:
sudo yum install libcurl-devel - Windows: 从 curl 官网 下载预编译的二进制包,并将其包含路径和库路径添加到你的 IDE 或编译器中。
步骤 2: C/C++ 代码示例
下面是使用 libcurl 实现 STOMP 生产者和消费者的代码。
stomp_producer.c (生产者)
#include <stdio.h>
#include <string.h>
#include <curl/curl.h>
// 定义 ActiveMQ STOMP 服务器地址
#define STOMP_URL "stomp://localhost:61613"
#define QUEUE_NAME "/queue/TEST.QUEUE"
#define STOMP_LOGIN "admin"
#define STOMP_PASSCODE "admin"
// 用于 libcurl 写入回调的缓冲区
struct MemoryStruct {
char *memory;
size_t size;
};
static size_t
WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp)
{
size_t realsize = size * nmemb;
struct MemoryStruct *mem = (struct MemoryStruct *)userp;
char *ptr = realloc(mem->memory, mem->size + realsize + 1);
if(!ptr) {
/* out of memory! */
printf("not enough memory (realloc returned NULL)\n");
return 0;
}
mem->memory = ptr;
memcpy(&(mem->memory[mem->size]), contents, realsize);
mem->size += realsize;
mem->memory[mem->size] = 0;
return realsize;
}
void send_stomp_message(const char* message) {
CURL *curl;
CURLcode res;
struct MemoryStruct chunk;
chunk.memory = malloc(1); // will be grown by realloc
chunk.size = 0;
curl_global_init(CURL_GLOBAL_ALL);
curl = curl_easy_init();
if(curl) {
// 1. 构建 STOMP SEND 帧体
char body[1024];
int body_len = snprintf(body, sizeof(body), "%s", message);
// 2. 构建完整的 STOMP SEND 帧
char stomp_frame[2048];
int frame_len = snprintf(stomp_frame, sizeof(stomp_frame),
"SEND\n"
"destination:%s\n"
"content-type:text/plain\n"
"\n" // 空行分隔头和体
"%s",
QUEUE_NAME, message
);
printf("Sending STOMP frame:\n---\n%s\n---\n", stomp_frame);
// 3. 设置 libcurl 选项
curl_easy_setopt(curl, CURLOPT_URL, STOMP_URL);
curl_easy_setopt(curl, CURLOPT_USERNAME, STOMP_LOGIN);
curl_easy_setopt(curl, CURLOPT_PASSWORD, STOMP_PASSCODE);
// 以 POST 方式发送原始数据
curl_easy_setopt(curl, CURLOPT_POST, 1L);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, stomp_frame);
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)frame_len);
// 以二进制模式发送,防止 libcurl 修改换行符
curl_easy_setopt(curl, CURLOPT_BINARYTRANSFER, 1L);
// 设置回调函数来接收服务器响应
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&chunk);
// 执行请求
res = curl_easy_perform(curl);
// 检查错误
if(res != CURLE_OK) {
fprintf(stderr, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
} else {
printf("Server response:\n---\n%s\n---\n", chunk.memory);
}
// 清理
curl_easy_cleanup(curl);
free(chunk.memory);
}
curl_global_cleanup();
}
int main() {
send_stomp_message("Hello from C Producer! Message 1.");
send_stomp_message("Hello from C Producer! Message 2.");
return 0;
}
stomp_consumer.c (消费者)
消费者需要先订阅,然后保持连接以持续接收消息,这里我们使用 CURLOPT_TCP_KEEPALIVE 和循环来模拟持续监听。
#include <stdio.h>
#include <string.h>
#include <curl/curl.h>
#define STOMP_URL "stomp://localhost:61613"
#define QUEUE_NAME "/queue/TEST.QUEUE"
#define STOMP_LOGIN "admin"
#define STOMP_PASSCODE "admin"
struct MemoryStruct {
char *memory;
size_t size;
};
static size_t
WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp)
{
size_t realsize = size * nmemb;
struct MemoryStruct *mem = (structMemoryStruct *)userp;
char *ptr = realloc(mem->memory, mem->size + realsize + 1);
if(!ptr) {
printf("not enough memory (realloc returned NULL)\n");
return 0;
}
mem->memory = ptr;
memcpy(&(mem->memory[mem->size]), contents, realsize);
mem->size += realsize;
mem->memory[mem->size] = 0;
return realsize;
}
void listen_for_messages() {
CURL *curl;
CURLcode res;
struct MemoryStruct chunk;
chunk.memory = malloc(1);
chunk.size = 0;
curl_global_init(CURL_GLOBAL_ALL);
curl = curl_easy_init();
if(curl) {
// 1. 构建 STOMP SUBSCRIBE 帧
char subscribe_frame[512];
int sub_frame_len = snprintf(subscribe_frame, sizeof(subscribe_frame),
"SUBSCRIBE\n"
"destination:%s\n"
"ack:client\n" // 客户端确认模式
"\n"
);
printf("Sending STOMP SUBSCRIBE frame:\n---\n%s\n---\n", subscribe_frame);
// 2. 设置 libcurl 选项
curl_easy_setopt(curl, CURLOPT_URL, STOMP_URL);
curl_easy_setopt(curl, CURLOPT_USERNAME, STOMP_LOGIN);
curl_easy_setopt(curl, CURLOPT_PASSWORD, STOMP_PASSCODE);
curl_easy_setopt(curl, CURLOPT_POST, 1L);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, subscribe_frame);
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)sub_frame_len);
curl_easy_setopt(curl, CURLOPT_BINARYTRANSFER, 1L);
// 保持连接,以便接收后续消息
curl_easy_setopt(curl, CURLOPT_TCP_KEEPALIVE, 1L);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&chunk);
// 执行订阅请求
res = curl_easy_perform(curl);
if(res != CURLE_OK) {
fprintf(stderr, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
goto cleanup;
}
printf("Subscription successful. Waiting for messages...\n");
printf("Server response after SUBSCRIBE:\n---\n%s\n---\n", chunk.memory);
free(chunk.memory); // 清空响应,准备接收消息
chunk.memory = malloc(1);
chunk.size = 0;
// 3. 模拟持续监听 (在实际应用中,你可能需要更复杂的逻辑)
// ActiveMQ 会持续向这个连接推送消息
for(int i = 0; i < 5; i++) { // 假设接收5条消息
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, 0); // 不再发送新数据
res = curl_easy_perform(curl);
if(res != CURLE_OK) {
fprintf(stderr, "curl_easy_perform() failed while listening: %s\n", curl_easy_strerror(res));
break;
}
printf("\n--- Received Message Frame ---\n%s\n--- End of Frame ---\n", chunk.memory);
// 4. 发送 STOMP ACK 确认收到消息
// (需要从消息帧中提取 'message-id',这里简化处理)
char ack_frame[256];
// 注意:真实场景下需要解析 chunk.memory 中的 'message-id'
// 此处仅作演示
int ack_frame_len = snprintf(ack_frame, sizeof(ack_frame),
"ACK\n"
"message-id:dummy-id-%d\n" // 替换为真实的 message-id
"\n"
);
printf("Sending ACK for message...\n");
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, ack_frame);
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)ack_frame_len);
res = curl_easy_perform(curl);
if(res != CURLE_OK) {
fprintf(stderr, "Failed to send ACK: %s\n", curl_easy_strerror(res));
}
free(chunk.memory);
chunk.memory = malloc(1);
chunk.size = 0;
}
cleanup:
curl_easy_cleanup(curl);
free(chunk.memory);
}
curl_global_cleanup();
}
int main() {
listen_for_messages();
return 0;
}
编译和运行
- 编译生产者:
gcc stomp_producer.c -o stomp_producer -lcurl
- 编译消费者:
gcc stomp_consumer.c -o stomp_consumer -lcurl
- 运行:
- 先启动消费者,让它开始订阅:
./stomp_consumer
- 然后在另一个终端启动生产者,发送消息:
./stomp_producer
- 你会看到消费者的终端打印出从生产者那里接收到的消息。
- 先启动消费者,让它开始订阅:
使用 POSIX Sockets(更底层)
这种方法不依赖第三方库,能让你更清楚地了解 STOMP 协议的细节,但代码量更大,错误处理更复杂。
socket_producer.c (生产者)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define SERVER_PORT 61613
#define SERVER_IP "127.0.0.1"
#define QUEUE_NAME "/queue/TEST.QUEUE"
#define LOGIN "admin"
#define PASSCODE "admin"
void send_stomp_message(int sock, const char* message) {
char stomp_frame[2048];
int frame_len = snprintf(stomp_frame, sizeof(stomp_frame),
"SEND\n"
"destination:%s\n"
"content-type:text/plain\n"
"\n" // 空行
"%s",
QUEUE_NAME, message
);
printf("Sending:\n---\n%s\n---\n", stomp_frame);
if (send(sock, stomp_frame, frame_len, 0) < 0) {
perror("send failed");
return;
}
char buffer[4096] = {0};
int valread = recv(sock, buffer, 4096, 0);
if (valread > 0) {
printf("Server response:\n---\n%s\n---\n", buffer);
}
}
int main() {
int sock = 0;
struct sockaddr_in serv_addr;
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
printf("\n Socket creation error \n");
return -1;
}
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(SERVER_PORT);
if (inet_pton(AF_INET, SERVER_IP, &serv_addr.sin_addr) <= 0) {
printf("\nInvalid address/ Address not supported \n");
return -1;
}
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
printf("\nConnection Failed \n");
return -1;
}
// STOMP CONNECT
char connect_frame[256];
int connect_len = snprintf(connect_frame, sizeof(connect_frame),
"CONNECT\n"
"login:%s\n"
"passcode:%s\n"
"\n"
"\0", // 终止符,STOMP CONNECT 可以没有体
LOGIN, PASSCODE
);
send(sock, connect_frame, connect_len, 0);
char response[4096] = {0};
recv(sock, response, 4096, 0);
printf("Connect Response:\n---\n%s\n---\n", response);
// 发送消息
send_stomp_message(sock, "Hello from Socket Producer! Message 1.");
send_stomp_message(sock, "Hello from Socket Producer! Message 2.");
close(sock);
return 0;
}
socket_consumer.c (消费者)
消费者使用 Socket 会更复杂,因为它需要处理连接、订阅、接收多条消息和确认(ACK),这通常需要一个状态机来解析 STOMP 帧的流数据,上面的 libcurl 方法已经足够,这里不再赘述 Socket 版本的消费者,因为它会使教程过于冗长。
总结与进阶
- 核心思想:C/C++ 通过 STOMP 协议与 ActiveMQ 通信,STOMP 是一种简单的文本协议,易于手动构建和解析。
- 推荐工具:
libcurl是连接 ActiveMQ 的绝佳选择,它封装了底层的网络细节,让你能专注于构建 STOMP 帧。 - 关键步骤:
- 连接:发送
CONNECT帧,带上用户名和密码。 - 生产者:发送
SEND帧,指定destination(队列名) 和消息体。 - 消费者:发送
SUBSCRIBE帧,指定destination,然后保持连接,持续接收服务器推送的MESSAGE帧,收到消息后,根据需要发送ACK帧。
- 连接:发送
进阶
- 主题 vs. 队列:将代码中的
destination从/queue/...改为/topic/...即可发布/订阅主题消息,注意,主题是“一对多”的,连接断开后,消息会丢失(除非使用持久化订阅)。 - 错误处理:在生产代码中,必须进行更健壮的错误处理,例如检查
send/recv的返回值,处理网络中断,解析 STOMP 响应头中的receipt和error帧。 - 多线程:消费者通常在一个单独的线程中运行,以阻塞方式等待消息,而不阻塞主程序。
- STOMP 高级特性:探索
receipt机制(确保服务器收到消息)、heart-beat(保活)、持久化订阅等。 - 其他库:还有一些专门为 STOMP 设计的 C++ 库,如 cpp-stomp,它们提供了更高级的面向对象的接口,可以简化开发。
