项目

Kafka分布式事件总线集成

本文档详细说明如何将Kafka配置为分布式事件总线提供程序。有关分布式事件总线系统的使用方法,请参阅 分布式事件总线文档

安装步骤

使用ABP CLI将Volo.Abp.EventBus.Kafka NuGet包添加到您的项目:

  • 若尚未安装,请先安装 ABP CLI
  • 在需要添加Volo.Abp.EventBus.Kafka包的.csproj文件所在目录打开命令行(终端)
  • 运行abp add-package Volo.Abp.EventBus.Kafka命令

如需手动安装,请将Volo.Abp.EventBus.Kafka NuGet包安装到项目,并在项目的ABP模块类中添加[DependsOn(typeof(AbpEventBusKafkaModule))]特性

配置说明

您可以通过标准配置系统进行配置,例如使用 appsettings.json 文件或 选项类

appsettings.json文件配置

这是配置Kafka设置的最简方法,同时非常灵活,因为您可以使用AspNet Core支持的任何其他配置源(如环境变量)

示例:使用默认配置连接本地Kafka服务器的最小配置

{
  "Kafka": {
    "Connections": {
      "Default": {
        "BootstrapServers": "localhost:9092"
      }
    },
    "EventBus": {
      "GroupId": "MyGroupId",
      "TopicName": "MyTopicName"
    }
  }
}
  • MyGroupId是此应用程序名称,在Kafka中用作消费者组ID
  • MyTopicName主题名称

详细了解这些选项,请参阅Kafka文档

连接配置

如需连接非本地主机服务器,需要配置连接属性

示例:指定主机名(作为IP地址)

{
  "Kafka": {
    "Connections": {
      "Default": {
        "BootstrapServers": "123.123.123.123:9092"
      }
    },
    "EventBus": {
      "GroupId": "MyGroupId",
      "TopicName": "MyTopicName"
    }
  }
}

允许定义多个连接。这种情况下,可指定事件总线使用的连接

示例:声明两个连接并选用其中一个用于事件总线

{
  "Kafka": {
    "Connections": {
      "Default": {
        "BootstrapServers": "123.123.123.123:9092"
      },
      "SecondConnection": {
        "BootstrapServers": "321.321.321.321:9092"
      }
    },
    "EventBus": {
      "GroupId": "MyGroupId",
      "TopicName": "MyTopicName",
      "ConnectionName": "SecondConnection"
    }
  }
}

这样可以在应用中使用多个Kafka集群,但为事件总线选择其中之一

您可以使用任何ClientConfig属性作为连接属性

示例:指定套接字超时时间

{
  "Kafka": {
    "Connections": {
      "Default": {
        "BootstrapServers": "123.123.123.123:9092",
        "SocketTimeoutMs": 60000
      }
    }
  }
}

选项类配置

AbpKafkaOptionsAbpKafkaEventBusOptions类可用于配置Kafka的连接字符串和事件总线选项

您可以在模块的ConfigureServices方法中配置这些选项

示例:配置连接

Configure<AbpKafkaOptions>(options =>
{
    options.Connections.Default.BootstrapServers = "123.123.123.123:9092";
    options.Connections.Default.SaslUsername = "user";
    options.Connections.Default.SaslPassword = "pwd";
});

示例:配置消费者配置

Configure<AbpKafkaOptions>(options =>
{
    options.ConfigureConsumer = config =>
    {
        config.GroupId = "MyGroupId";
        config.EnableAutoCommit = false;
    };
});

示例:配置生产者配置

Configure<AbpKafkaOptions>(options =>
{
    options.ConfigureProducer = config =>
    {
        config.MessageTimeoutMs = 6000;
        config.Acks = Acks.All;
    };
});

示例:配置主题规范

Configure<AbpKafkaOptions>(options =>
{
    options.ConfigureTopic = specification =>
    {
        specification.ReplicationFactor = 3;
        specification.NumPartitions = 3;
    };
});

这些选项类的使用可与appsettings.json方式结合。在代码中配置选项属性将覆盖配置文件中的值

在本文档中