1. 准备工作:安装和配置 ActiveMQ。
  2. STOMP 协议简介:了解我们将使用的通信语言。
  3. C/C++ 客户端开发
    • 使用 libcurl 库(推荐,简单易用)
    • 使用 POSIX Sockets(更底层,有助于理解原理)
  4. 完整代码示例:生产者和消费者。
  5. 总结与进阶

准备工作:安装和运行 ActiveMQ

在开始之前,你需要一个正在运行的 ActiveMQ 服务器。

activemq c 教程
(图片来源网络,侵删)
  1. 下载 ActiveMQ

  2. 启动 ActiveMQ

    • 解压下载的文件。
    • 进入 bin 目录(在 Windows 下是 bin\win64,在 Linux/macOS 下是 bin)。
    • 运行启动脚本:
      • Windows: activemq.bat
      • Linux/macOS: ./activemq start
    • 启动后,你可以通过浏览器访问管理控制台:http://localhost:8161 (默认用户名/密码是 admin/admin)。
  3. 启用 STOMP 协议

    • ActiveMQ 默认已经启用了 STOMP 协议,端口为 61613,你可以在管理控制台的 "Connections" 标签页下看到 stomp 连接器。

STOMP 协议简介

STOMP 是一个简单的协议,客户端和服务器之间通过发送来通信,每个帧由三部分组成:

activemq c 教程
(图片来源网络,侵删)
  • 命令:如 CONNECT, SEND, SUBSCRIBE, UNSUBSCRIBE, DISCONNECT 等。
  • :键值对,提供关于帧的元信息,如 destination (队列/主题名), receipt (请求确认), content-type (消息类型)。
  • :实际的消息内容,可以是文本或二进制数据。

发送一条消息到队列 TEST.QUEUE

SEND
destination:/queue/TEST.QUEUE
content-type:text/plain
Hello from C Client!

一个空行 (\n\n) 分隔头和体。


C/C++ 客户端开发

我们将实现两个程序:一个生产者(发送消息)和一个消费者(接收消息)。

使用 libcurl 库(推荐)

libcurl 是一个强大的客户端 URL 传输库,支持多种协议,包括 HTTP 和 STOMP(通过原始 TCP 模式),这使得它成为连接 ActiveMQ 的绝佳选择。

步骤 1: 安装 libcurl

  • Debian/Ubuntu: sudo apt-get install libcurl4-openssl-dev
  • RedHat/CentOS: sudo yum install libcurl-devel
  • Windows: 从 curl 官网 下载预编译的二进制包,并将其包含路径和库路径添加到你的 IDE 或编译器中。

步骤 2: C/C++ 代码示例

下面是使用 libcurl 实现 STOMP 生产者和消费者的代码。

stomp_producer.c (生产者)

#include <stdio.h>
#include <string.h>
#include <curl/curl.h>
// 定义 ActiveMQ STOMP 服务器地址
#define STOMP_URL "stomp://localhost:61613"
#define QUEUE_NAME "/queue/TEST.QUEUE"
#define STOMP_LOGIN "admin"
#define STOMP_PASSCODE "admin"
// 用于 libcurl 写入回调的缓冲区
struct MemoryStruct {
  char *memory;
  size_t size;
};
static size_t
WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp)
{
  size_t realsize = size * nmemb;
  struct MemoryStruct *mem = (struct MemoryStruct *)userp;
  char *ptr = realloc(mem->memory, mem->size + realsize + 1);
  if(!ptr) {
    /* out of memory! */
    printf("not enough memory (realloc returned NULL)\n");
    return 0;
  }
  mem->memory = ptr;
  memcpy(&(mem->memory[mem->size]), contents, realsize);
  mem->size += realsize;
  mem->memory[mem->size] = 0;
  return realsize;
}
void send_stomp_message(const char* message) {
    CURL *curl;
    CURLcode res;
    struct MemoryStruct chunk;
    chunk.memory = malloc(1);  // will be grown by realloc
    chunk.size = 0;
    curl_global_init(CURL_GLOBAL_ALL);
    curl = curl_easy_init();
    if(curl) {
        // 1. 构建 STOMP SEND 帧体
        char body[1024];
        int body_len = snprintf(body, sizeof(body), "%s", message);
        // 2. 构建完整的 STOMP SEND 帧
        char stomp_frame[2048];
        int frame_len = snprintf(stomp_frame, sizeof(stomp_frame),
            "SEND\n"
            "destination:%s\n"
            "content-type:text/plain\n"
            "\n" // 空行分隔头和体
            "%s",
            QUEUE_NAME, message
        );
        printf("Sending STOMP frame:\n---\n%s\n---\n", stomp_frame);
        // 3. 设置 libcurl 选项
        curl_easy_setopt(curl, CURLOPT_URL, STOMP_URL);
        curl_easy_setopt(curl, CURLOPT_USERNAME, STOMP_LOGIN);
        curl_easy_setopt(curl, CURLOPT_PASSWORD, STOMP_PASSCODE);
        // 以 POST 方式发送原始数据
        curl_easy_setopt(curl, CURLOPT_POST, 1L);
        curl_easy_setopt(curl, CURLOPT_POSTFIELDS, stomp_frame);
        curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)frame_len);
        // 以二进制模式发送,防止 libcurl 修改换行符
        curl_easy_setopt(curl, CURLOPT_BINARYTRANSFER, 1L);
        // 设置回调函数来接收服务器响应
        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
        curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&chunk);
        // 执行请求
        res = curl_easy_perform(curl);
        // 检查错误
        if(res != CURLE_OK) {
            fprintf(stderr, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
        } else {
            printf("Server response:\n---\n%s\n---\n", chunk.memory);
        }
        // 清理
        curl_easy_cleanup(curl);
        free(chunk.memory);
    }
    curl_global_cleanup();
}
int main() {
    send_stomp_message("Hello from C Producer! Message 1.");
    send_stomp_message("Hello from C Producer! Message 2.");
    return 0;
}

stomp_consumer.c (消费者) 消费者需要先订阅,然后保持连接以持续接收消息,这里我们使用 CURLOPT_TCP_KEEPALIVE 和循环来模拟持续监听。

#include <stdio.h>
#include <string.h>
#include <curl/curl.h>
#define STOMP_URL "stomp://localhost:61613"
#define QUEUE_NAME "/queue/TEST.QUEUE"
#define STOMP_LOGIN "admin"
#define STOMP_PASSCODE "admin"
struct MemoryStruct {
  char *memory;
  size_t size;
};
static size_t
WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp)
{
  size_t realsize = size * nmemb;
  struct MemoryStruct *mem = (structMemoryStruct *)userp;
  char *ptr = realloc(mem->memory, mem->size + realsize + 1);
  if(!ptr) {
    printf("not enough memory (realloc returned NULL)\n");
    return 0;
  }
  mem->memory = ptr;
  memcpy(&(mem->memory[mem->size]), contents, realsize);
  mem->size += realsize;
  mem->memory[mem->size] = 0;
  return realsize;
}
void listen_for_messages() {
    CURL *curl;
    CURLcode res;
    struct MemoryStruct chunk;
    chunk.memory = malloc(1);
    chunk.size = 0;
    curl_global_init(CURL_GLOBAL_ALL);
    curl = curl_easy_init();
    if(curl) {
        // 1. 构建 STOMP SUBSCRIBE 帧
        char subscribe_frame[512];
        int sub_frame_len = snprintf(subscribe_frame, sizeof(subscribe_frame),
            "SUBSCRIBE\n"
            "destination:%s\n"
            "ack:client\n" // 客户端确认模式
            "\n"
        );
        printf("Sending STOMP SUBSCRIBE frame:\n---\n%s\n---\n", subscribe_frame);
        // 2. 设置 libcurl 选项
        curl_easy_setopt(curl, CURLOPT_URL, STOMP_URL);
        curl_easy_setopt(curl, CURLOPT_USERNAME, STOMP_LOGIN);
        curl_easy_setopt(curl, CURLOPT_PASSWORD, STOMP_PASSCODE);
        curl_easy_setopt(curl, CURLOPT_POST, 1L);
        curl_easy_setopt(curl, CURLOPT_POSTFIELDS, subscribe_frame);
        curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)sub_frame_len);
        curl_easy_setopt(curl, CURLOPT_BINARYTRANSFER, 1L);
        // 保持连接,以便接收后续消息
        curl_easy_setopt(curl, CURLOPT_TCP_KEEPALIVE, 1L);
        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
        curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&chunk);
        // 执行订阅请求
        res = curl_easy_perform(curl);
        if(res != CURLE_OK) {
            fprintf(stderr, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
            goto cleanup;
        }
        printf("Subscription successful. Waiting for messages...\n");
        printf("Server response after SUBSCRIBE:\n---\n%s\n---\n", chunk.memory);
        free(chunk.memory); // 清空响应,准备接收消息
        chunk.memory = malloc(1);
        chunk.size = 0;
        // 3. 模拟持续监听 (在实际应用中,你可能需要更复杂的逻辑)
        // ActiveMQ 会持续向这个连接推送消息
        for(int i = 0; i < 5; i++) { // 假设接收5条消息
            curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, 0); // 不再发送新数据
            res = curl_easy_perform(curl);
            if(res != CURLE_OK) {
                fprintf(stderr, "curl_easy_perform() failed while listening: %s\n", curl_easy_strerror(res));
                break;
            }
            printf("\n--- Received Message Frame ---\n%s\n--- End of Frame ---\n", chunk.memory);
            // 4. 发送 STOMP ACK 确认收到消息
            // (需要从消息帧中提取 'message-id',这里简化处理)
            char ack_frame[256];
            // 注意:真实场景下需要解析 chunk.memory 中的 'message-id'
            // 此处仅作演示
            int ack_frame_len = snprintf(ack_frame, sizeof(ack_frame),
                "ACK\n"
                "message-id:dummy-id-%d\n" // 替换为真实的 message-id
                "\n"
            );
            printf("Sending ACK for message...\n");
            curl_easy_setopt(curl, CURLOPT_POSTFIELDS, ack_frame);
            curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)ack_frame_len);
            res = curl_easy_perform(curl);
            if(res != CURLE_OK) {
                fprintf(stderr, "Failed to send ACK: %s\n", curl_easy_strerror(res));
            }
            free(chunk.memory);
            chunk.memory = malloc(1);
            chunk.size = 0;
        }
    cleanup:
        curl_easy_cleanup(curl);
        free(chunk.memory);
    }
    curl_global_cleanup();
}
int main() {
    listen_for_messages();
    return 0;
}

编译和运行

  1. 编译生产者:
    gcc stomp_producer.c -o stomp_producer -lcurl
  2. 编译消费者:
    gcc stomp_consumer.c -o stomp_consumer -lcurl
  3. 运行:
    • 先启动消费者,让它开始订阅:
      ./stomp_consumer
    • 然后在另一个终端启动生产者,发送消息:
      ./stomp_producer
    • 你会看到消费者的终端打印出从生产者那里接收到的消息。

使用 POSIX Sockets(更底层)

这种方法不依赖第三方库,能让你更清楚地了解 STOMP 协议的细节,但代码量更大,错误处理更复杂。

socket_producer.c (生产者)

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define SERVER_PORT 61613
#define SERVER_IP "127.0.0.1"
#define QUEUE_NAME "/queue/TEST.QUEUE"
#define LOGIN "admin"
#define PASSCODE "admin"
void send_stomp_message(int sock, const char* message) {
    char stomp_frame[2048];
    int frame_len = snprintf(stomp_frame, sizeof(stomp_frame),
        "SEND\n"
        "destination:%s\n"
        "content-type:text/plain\n"
        "\n" // 空行
        "%s",
        QUEUE_NAME, message
    );
    printf("Sending:\n---\n%s\n---\n", stomp_frame);
    if (send(sock, stomp_frame, frame_len, 0) < 0) {
        perror("send failed");
        return;
    }
    char buffer[4096] = {0};
    int valread = recv(sock, buffer, 4096, 0);
    if (valread > 0) {
        printf("Server response:\n---\n%s\n---\n", buffer);
    }
}
int main() {
    int sock = 0;
    struct sockaddr_in serv_addr;
    if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        printf("\n Socket creation error \n");
        return -1;
    }
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(SERVER_PORT);
    if (inet_pton(AF_INET, SERVER_IP, &serv_addr.sin_addr) <= 0) {
        printf("\nInvalid address/ Address not supported \n");
        return -1;
    }
    if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
        printf("\nConnection Failed \n");
        return -1;
    }
    // STOMP CONNECT
    char connect_frame[256];
    int connect_len = snprintf(connect_frame, sizeof(connect_frame),
        "CONNECT\n"
        "login:%s\n"
        "passcode:%s\n"
        "\n"
        "\0", // 终止符,STOMP CONNECT 可以没有体
        LOGIN, PASSCODE
    );
    send(sock, connect_frame, connect_len, 0);
    char response[4096] = {0};
    recv(sock, response, 4096, 0);
    printf("Connect Response:\n---\n%s\n---\n", response);
    // 发送消息
    send_stomp_message(sock, "Hello from Socket Producer! Message 1.");
    send_stomp_message(sock, "Hello from Socket Producer! Message 2.");
    close(sock);
    return 0;
}

socket_consumer.c (消费者) 消费者使用 Socket 会更复杂,因为它需要处理连接、订阅、接收多条消息和确认(ACK),这通常需要一个状态机来解析 STOMP 帧的流数据,上面的 libcurl 方法已经足够,这里不再赘述 Socket 版本的消费者,因为它会使教程过于冗长。


总结与进阶

  • 核心思想:C/C++ 通过 STOMP 协议与 ActiveMQ 通信,STOMP 是一种简单的文本协议,易于手动构建和解析。
  • 推荐工具libcurl 是连接 ActiveMQ 的绝佳选择,它封装了底层的网络细节,让你能专注于构建 STOMP 帧。
  • 关键步骤
    1. 连接:发送 CONNECT 帧,带上用户名和密码。
    2. 生产者:发送 SEND 帧,指定 destination (队列名) 和消息体。
    3. 消费者:发送 SUBSCRIBE 帧,指定 destination,然后保持连接,持续接收服务器推送的 MESSAGE 帧,收到消息后,根据需要发送 ACK 帧。

进阶

  • 主题 vs. 队列:将代码中的 destination/queue/... 改为 /topic/... 即可发布/订阅主题消息,注意,主题是“一对多”的,连接断开后,消息会丢失(除非使用持久化订阅)。
  • 错误处理:在生产代码中,必须进行更健壮的错误处理,例如检查 send/recv 的返回值,处理网络中断,解析 STOMP 响应头中的 receipterror 帧。
  • 多线程:消费者通常在一个单独的线程中运行,以阻塞方式等待消息,而不阻塞主程序。
  • STOMP 高级特性:探索 receipt 机制(确保服务器收到消息)、heart-beat(保活)、持久化订阅等。
  • 其他库:还有一些专门为 STOMP 设计的 C++ 库,如 cpp-stomp,它们提供了更高级的面向对象的接口,可以简化开发。