The documentation you are viewing is for Dapr v1.3 which is an older version of Dapr. For up-to-date documentation, see the latest version.

RabbitMQ

RabbitMQ pubsub组件的详细文档

配置

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: rabbitmq-pubsub
  namespace: default
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: host
    value: "amqp://localhost:5672"
  - name: durable
    value: "false"
  - name: deletedWhenUnused
    value: "false"
  - name: autoAck
    value: "false"
  - name: deliveryMode
    value: "0"
  - name: requeueInFailure
    value: "false"
  - name: prefetchCount
    value: "0"
  - name: reconnectWait
    value: "0"
  - name: concurrencyMode
    value: parallel
  - name: backOffPolicy
    value: "exponential"
  - name: backOffInitialInterval
    value: "100"
  - name: backOffMaxRetries
    value: "16"

元数据字段规范

字段 必填 详情 示例
host Y Rabbitmq 的连接地址 amqp://user:pass@localhost:5672
durable N 是否使用durable队列, 默认值为 "false" 默认值为 "false" "true", "false"
deletedWhenUnused N 是否应将队列配置为 自动删除 默认值为 "true" "true", "false"
autoAck N 队列的消费者是否应该auto-ack消息 默认值为 "false" 默认值为 "false" "true", "false"
deliveryMode N 发布消息时的持久化模式, 默认值为 "0". 值为"2"时RabbitMQ会进行持久化,其他值反之 "0", "2"
requeueInFailure N 在发送否定应答失败的情况下,是否进行重排。 默认值为 "false" "true", "false"
prefetchCount N Number of messages to prefetch. 生产环境中需要考虑设置一个非零值。 该值默认为"0",这意味着所有可用消息都将被预先提取 "2"
reconnectWait N 如果发生连接失败,在重新连接之前需要等待多长时间(秒) "0"
concurrencyMode N 默认值是parallel,表示允许并行处理多个消息(如果配置了app-max-concurrency,最大并行数会受到该值限制), 设置为single可禁用并行处理, 大多数情况下没必要去这么做 设置为single可禁用并行处理, 大多数情况下没必要去这么做 parallel, single
backOffPolicy N 重试策略,"constant"是一个总是返回相同的退避延迟的退避策略。 "exponential" 是一种退避策略,它使用呈指数级增长的随机化函数增加每次重试尝试的退避周期。 默认为 "constant" constantexponential
backOffDuration N 固定间隔仅在策略恒定时生效。 有两种有效的格式,一种是带有单位后缀格式的分数,另一种是将以毫秒为单位处理的纯数字格式。 有效的时间单位为"ns”、“us”(或"μs”)、“ms”、“s”、“m”、“h”。 默认为 "5s" "5s""5000"
backOffInitialInterval N 重试时的回退初始间隔。 仅当策略是指数型时才生效。 有两种有效的格式,一种是带有单位后缀格式的分数,另一种是将以毫秒为单位处理的纯数字格式。 有效的时间单位为"ns”、“us”(或"μs”)、“ms”、“s”、“m”、“h”。 默认值为 "500" "50"
backOffMaxInterval N 重试时的回退初始间隔。 仅当策略是指数型时才生效。 有两种有效的格式,一种是带有单位后缀格式的分数,另一种是将以毫秒为单位处理的纯数字格式。 有效的时间单位为"ns”、“us”(或"μs”)、“ms”、“s”、“m”、“h”。 默认为 "60s" "60000"
backOffMaxRetries N 返回错误前重试处理消息的最大次数。 默认为 "0" 这意味着组件不会重试处理消息。 "-1" 将无限期重试,直到处理完消息或关闭应用程序。 任何正数都被视为最大重试计数。 "3"
backOffRandomizationFactor N 随机系数,介于 1 和 0 之间,包括 0 但不是 1。 随机间隔 = 重试间隔 * (1 ±backOffRandomizationFactor)。 默认值为 "0.5". "0.5"
backOffMultiplier N 策略的退避倍数。 通过将间隔乘以倍数来递增间隔。 默认值为 "1.5" "1.5"
backOffMaxElapsedTime N 在 MaxElapsedTime 之后,ExponentialBackOff 返回 Stop。 有两种有效的格式,一种是带有单位后缀格式的分数,另一种是将以毫秒为单位处理的纯数字格式。 有效的时间单位为"ns”、“us”(或"μs”)、“ms”、“s”、“m”、“h”。 默认为 "15m" "15m"

退避策略介绍

回退重试策略可以指示 dapr sidecar 如何重新发送消息。 默认情况下,重试策略处于关闭状态,这意味着 sidecar 将向服务发送一次消息。 当服务返回结果时,无论消息是否正确,该消息都将被标记为已消费。 以上是基于 autoAck 的条件, requeueInFailure 设置为false(如果 requeueInFailure 设置为true,则消息将获得第二次机会)。

但在某些情况下,您可能希望 dapr 使用(指数或常量)回退策略重试推送消息,直到消息正常处理或重试次数用尽。 当您的服务异常发生故障但 sidecar 没有一起停止时,这可能很有用。 添加回退策略将在服务停机期间重试消息推送,而不是将这些消息标记为已消费。

创建RabbitMQ服务


你可以使用Docker在本地运行RabbitMQ :

docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3

然后你可以通过localhost:5672与服务器交互。


在 Kubernetes 上安装 RabbitMQ 最简单的方法是使用 Helm chart

helm install rabbitmq stable/rabbitmq

根据Helm图表的输出,得到用户名和密码。

这会把 RabbitMQ 安装到 default 命名空间中, 这会把 RabbitMQ 安装到 default 命名空间中, 要与RabbitMQ进行交互,请使用以下方法找到服务:kubectl get svc rabbitmq

如果使用上面的示例进行安装,RabbitMQ服务器的客户端地址是:

rabbitmq.default.svc.cluster.local:5672

相关链接