NCCL源码解析①:初始化及ncclUniqueId的产生

作者|KIDGINBROOK
更新|潘丽晨
int e = cmd; \if( e != MPI_SUCCESS ) { \printf("Failed: MPI error %s:%d '%d'\n", \__FILE__,__LINE__, e); \exit(EXIT_FAILURE); \} \} while(0)cudaError_t e = cmd; \if( e != cudaSuccess ) { \printf("Failed: Cuda error %s:%d '%s'\n", \__FILE__,__LINE__,cudaGetErrorString(e)); \exit(EXIT_FAILURE); \} \} while(0)ncclResult_t r = cmd; \if (r!= ncclSuccess) { \printf("Failed, NCCL error %s:%d '%s'\n", \__FILE__,__LINE__,ncclGetErrorString(r)); \exit(EXIT_FAILURE); \} \} while(0)static uint64_t getHostHash(const char* string) {// Based on DJB2a, result = result * 33 ^ charuint64_t result = 5381;for (int c = 0; string[c] != '\0'; c++){result = ((result << 5) + result) ^ string[c];}return result;}static void getHostName(char* hostname, int maxlen) {gethostname(hostname, maxlen);for (int i=0; i< maxlen; i++) {if (hostname[i] == '.') {hostname[i] = '\0';return;}}}int main(int argc, char* argv[]){int size = 32*1024*1024;int myRank, nRanks, localRank = 0;//initializing MPIMPICHECK(MPI_Init(&argc, &argv));MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD, &myRank));MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &nRanks));//calculating localRank which is used in selecting a GPUuint64_t hostHashs[nRanks];char hostname[1024];getHostName(hostname, 1024);hostHashs[myRank] = getHostHash(hostname);MPICHECK(MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD));for (int p=0; p<nRanks; p++) {if (p == myRank) break;if (hostHashs[p] == hostHashs[myRank]) localRank++;}//each process is using two GPUsint nDev = 2;float** sendbuff = (float**)malloc(nDev * sizeof(float*));float** recvbuff = (float**)malloc(nDev * sizeof(float*));cudaStream_t* s = (cudaStream_t*)malloc(sizeof(cudaStream_t)*nDev);//picking GPUs based on localRankfor (int i = 0; i < nDev; ++i) {CUDACHECK(cudaSetDevice(localRank*nDev + i));CUDACHECK(cudaMalloc(sendbuff + i, size * sizeof(float)));CUDACHECK(cudaMalloc(recvbuff + i, size * sizeof(float)));CUDACHECK(cudaMemset(sendbuff[i], 1, size * sizeof(float)));CUDACHECK(cudaMemset(recvbuff[i], 0, size * sizeof(float)));CUDACHECK(cudaStreamCreate(s+i));}ncclUniqueId id;ncclComm_t comms[nDev];//generating NCCL unique ID at one process and broadcasting it to allif (myRank == 0) ncclGetUniqueId(&id);MPICHECK(MPI_Bcast((void *)&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD));//initializing NCCL, group API is required around ncclCommInitRank as it is//called across multiple GPUs in each thread/processNCCLCHECK(ncclGroupStart());for (int i=0; i<nDev; i++) {CUDACHECK(cudaSetDevice(localRank*nDev + i));NCCLCHECK(ncclCommInitRank(comms+i, nRanks*nDev, id, myRank*nDev + i));}NCCLCHECK(ncclGroupEnd());//calling NCCL communication API. Group API is required when using//multiple devices per thread/processNCCLCHECK(ncclGroupStart());for (int i=0; i<nDev; i++)NCCLCHECK(ncclAllReduce((const void*)sendbuff[i], (void*)recvbuff[i], size, ncclFloat, ncclSum,comms[i], s[i]));NCCLCHECK(ncclGroupEnd());//synchronizing on CUDA stream to complete NCCL communicationfor (int i=0; i<nDev; i++)CUDACHECK(cudaStreamSynchronize(s[i]));//freeing device memoryfor (int i=0; i<nDev; i++) {CUDACHECK(cudaFree(sendbuff[i]));CUDACHECK(cudaFree(recvbuff[i]));}//finalizing NCCLfor (int i=0; i<nDev; i++) {ncclCommDestroy(comms[i]);}//finalizing MPIMPICHECK(MPI_Finalize());printf("[MPI Rank %d] Success \n", myRank);return 0;}
ncclResult_t ncclGetUniqueId(ncclUniqueId* out) {NCCLCHECK(ncclInit());NCCLCHECK(PtrCheck(out, "GetUniqueId", "out"));return bootstrapGetUniqueId(out);}
ncclResult_t initNet() {// Always initialize bootstrap networkNCCLCHECK(bootstrapNetInit());NCCLCHECK(initNetPlugin(&ncclNet, &ncclCollNet));if (ncclNet != NULL) return ncclSuccess;if (initNet(&ncclNetIb) == ncclSuccess) {ncclNet = &ncclNetIb;} else {NCCLCHECK(initNet(&ncclNetSocket));ncclNet = &ncclNetSocket;}return ncclSuccess;}
static int findInterfaces(const char* prefixList, char* names, union socketAddress *addrs, int sock_family, int maxIfNameSize, int maxIfs) {struct netIf userIfs[MAX_IFS];bool searchNot = prefixList && prefixList[0] == '^';if (searchNot) prefixList++;bool searchExact = prefixList && prefixList[0] == '=';if (searchExact) prefixList++;int nUserIfs = parseStringList(prefixList, userIfs, MAX_IFS);int found = 0;struct ifaddrs *interfaces, *interface;getifaddrs(&interfaces);for (interface = interfaces; interface && found < maxIfs; interface = interface->ifa_next) {if (interface->ifa_addr == NULL) continue;int family = interface->ifa_addr->sa_family;if (family != AF_INET && family != AF_INET6)continue;if (sock_family != -1 && family != sock_family)continue;if (family == AF_INET6) {struct sockaddr_in6* sa = (struct sockaddr_in6*)(interface->ifa_addr);if (IN6_IS_ADDR_LOOPBACK(&sa->sin6_addr)) continue;}if (!(matchIfList(interface->ifa_name, -1, userIfs, nUserIfs, searchExact) ^ searchNot)) {continue;}bool duplicate = false;for (int i = 0; i < found; i++) {if (strcmp(interface->ifa_name, names+i*maxIfNameSize) == 0) { duplicate = true; break; }}if (!duplicate) {strncpy(names+found*maxIfNameSize, interface->ifa_name, maxIfNameSize);int salen = (family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6);memcpy(addrs+found, interface->ifa_addr, salen);found++;}}freeifaddrs(interfaces);return found;}
ncclResult_t ncclIbInit(ncclDebugLogger_t logFunction) {static int shownIbHcaEnv = 0;if(wrap_ibv_symbols() != ncclSuccess) { return ncclInternalError; }if (ncclParamIbDisable()) return ncclInternalError;if (ncclNIbDevs == -1) {pthread_mutex_lock(&ncclIbLock);wrap_ibv_fork_init();if (ncclNIbDevs == -1) {ncclNIbDevs = 0;if (findInterfaces(ncclIbIfName, &ncclIbIfAddr, MAX_IF_NAME_SIZE, 1) != 1) {WARN("NET/IB : No IP interface found.");return ncclInternalError;}// Detect IB cardsint nIbDevs;struct ibv_device** devices;// Check if user defined which IB device:port to usechar* userIbEnv = getenv("NCCL_IB_HCA");if (userIbEnv != NULL && shownIbHcaEnv++ == 0) INFO(NCCL_NET|NCCL_ENV, "NCCL_IB_HCA set to %s", userIbEnv);struct netIf userIfs[MAX_IB_DEVS];bool searchNot = userIbEnv && userIbEnv[0] == '^';if (searchNot) userIbEnv++;bool searchExact = userIbEnv && userIbEnv[0] == '=';if (searchExact) userIbEnv++;int nUserIfs = parseStringList(userIbEnv, userIfs, MAX_IB_DEVS);if (ncclSuccess != wrap_ibv_get_device_list(&devices, &nIbDevs)) return ncclInternalError;for (int d=0; d<nIbDevs && ncclNIbDevs<MAX_IB_DEVS; d++) {struct ibv_context * context;if (ncclSuccess != wrap_ibv_open_device(&context, devices[d]) || context == NULL) {WARN("NET/IB : Unable to open device %s", devices[d]->name);continue;}int nPorts = 0;struct ibv_device_attr devAttr;memset(&devAttr, 0, sizeof(devAttr));if (ncclSuccess != wrap_ibv_query_device(context, &devAttr)) {WARN("NET/IB : Unable to query device %s", devices[d]->name);if (ncclSuccess != wrap_ibv_close_device(context)) { return ncclInternalError; }continue;}for (int port = 1; port <= devAttr.phys_port_cnt; port++) {struct ibv_port_attr portAttr;if (ncclSuccess != wrap_ibv_query_port(context, port, &portAttr)) {WARN("NET/IB : Unable to query port %d", port);continue;}if (portAttr.state != IBV_PORT_ACTIVE) continue;if (portAttr.link_layer != IBV_LINK_LAYER_INFINIBAND&& portAttr.link_layer != IBV_LINK_LAYER_ETHERNET) continue;// check against user specified HCAs/portsif (! (matchIfList(devices[d]->name, port, userIfs, nUserIfs, searchExact) ^ searchNot)) {continue;}TRACE(NCCL_INIT|NCCL_NET,"NET/IB: [%d] %s:%d/%s ", d, devices[d]->name, port,portAttr.link_layer == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE");ncclIbDevs[ncclNIbDevs].device = d;ncclIbDevs[ncclNIbDevs].guid = devAttr.sys_image_guid;ncclIbDevs[ncclNIbDevs].port = port;ncclIbDevs[ncclNIbDevs].link = portAttr.link_layer;ncclIbDevs[ncclNIbDevs].speed = ncclIbSpeed(portAttr.active_speed) * ncclIbWidth(portAttr.active_width);ncclIbDevs[ncclNIbDevs].context = context;strncpy(ncclIbDevs[ncclNIbDevs].devName, devices[d]->name, MAXNAMESIZE);NCCLCHECK(ncclIbGetPciPath(ncclIbDevs[ncclNIbDevs].devName, &ncclIbDevs[ncclNIbDevs].pciPath, &ncclIbDevs[ncclNIbDevs].realPort));ncclIbDevs[ncclNIbDevs].maxQp = devAttr.max_qp;ncclNIbDevs++;nPorts++;pthread_create(&ncclIbAsyncThread, NULL, ncclIbAsyncThreadMain, context);}if (nPorts == 0 && ncclSuccess != wrap_ibv_close_device(context)) { return ncclInternalError; }}if (nIbDevs && (ncclSuccess != wrap_ibv_free_device_list(devices))) { return ncclInternalError; };}if (ncclNIbDevs == 0) {INFO(NCCL_INIT|NCCL_NET, "NET/IB : No device found.");} else {char line[1024];line[0] = '\0';for (int d=0; d<ncclNIbDevs; d++) {snprintf(line+strlen(line), 1023-strlen(line), " [%d]%s:%d/%s", d, ncclIbDevs[d].devName,ncclIbDevs[d].port, ncclIbDevs[d].link == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE");}line[1023] = '\0';char addrline[1024];INFO(NCCL_INIT|NCCL_NET, "NET/IB : Using%s ; OOB %s:%s", line, ncclIbIfName, socketToString(&ncclIbIfAddr.sa, addrline));}pthread_mutex_unlock(&ncclIbLock);}return ncclSuccess;}
ncclResult_t bootstrapCreateRoot(ncclUniqueId* id, bool idFromEnv) {ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id;void* listenComm;NCCLCHECK(bootstrapNetListen(idFromEnv ? dontCareIf : 0, netHandle, &listenComm));pthread_t thread;pthread_create(&thread, NULL, bootstrapRoot, listenComm);return ncclSuccess;}
static ncclResult_t bootstrapNetListen(int dev, ncclNetHandle_t* netHandle, void** listenComm) {union socketAddress* connectAddr = (union socketAddress*) netHandle;static_assert(sizeof(union socketAddress) < NCCL_NET_HANDLE_MAXSIZE, "union socketAddress size is too large");// if dev >= 0, listen based on devif (dev >= 0) {NCCLCHECK(bootstrapNetGetSocketAddr(dev, connectAddr));} else if (dev == findSubnetIf) {...} // Otherwise, handle stores a local addressstruct bootstrapNetComm* comm;NCCLCHECK(bootstrapNetNewComm(&comm));NCCLCHECK(createListenSocket(&comm->fd, connectAddr));*listenComm = comm;return ncclSuccess;}
static ncclResult_t bootstrapNetGetSocketAddr(int dev, union socketAddress* addr) {if (dev >= bootstrapNetIfs) return ncclInternalError;memcpy(addr, bootstrapNetIfAddrs+dev, sizeof(*addr));return ncclSuccess;}
struct bootstrapNetComm {int fd;};
static ncclResult_t createListenSocket(int *fd, union socketAddress *localAddr) {/* IPv4/IPv6 support */int family = localAddr->sa.sa_family;int salen = (family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6);/* Create socket and bind it to a port */int sockfd = socket(family, SOCK_STREAM, 0);if (sockfd == -1) {WARN("Net : Socket creation failed : %s", strerror(errno));return ncclSystemError;}if (socketToPort(&localAddr->sa)) {// Port is forced by env. Make sure we get the port.int opt = 1;SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)), "setsockopt");SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)), "setsockopt");}// localAddr port should be 0 (Any port)SYSCHECK(bind(sockfd, &localAddr->sa, salen), "bind");/* Get the assigned Port */socklen_t size = salen;SYSCHECK(getsockname(sockfd, &localAddr->sa, &size), "getsockname");char line[1024];TRACE(NCCL_INIT|NCCL_NET,"Listening on socket %s", socketToString(&localAddr->sa, line));/* Put the socket in listen mode* NB: The backlog will be silently truncated to the value in /proc/sys/net/core/somaxconn*/SYSCHECK(listen(sockfd, 16384), "listen");*fd = sockfd;return ncclSuccess;}
(本文经授权后由OneFlow发布。原文:https://blog.csdn.net/KIDGIN7439/article/details/126712106?spm=1001.2014.3001.5502)
其他人都在看 点击“阅读原文”,欢迎Star、试用OneFlow新版本

-
OpenClaw:开源 AI 个人智能助手与 AI 代理平台,构建你的专属数字分身
在 AI 技术深度渗透日常工作与生活的当下,OpenClaw 作为一款现象级开源 AI 个人智能助手与 AI 代理平台,正打破传统 AI 工具 “只对话、不落地” 的局限,以 “本地优先、自主执行、开源开放” 的核心特性,成为个人与中小型团队打造专属数字分身的首选工具,重新定义了 AI 智能体在个人场景的应用形态与价值边界。
넶2 2026-03-27 -
RTX PRO 5000系列深度解析:Pro 5000 Blackwell领航专业算力,英伟达显卡总代筑牢生态落地根基
在专业图形计算与AI加速领域,英伟达始终占据核心引领地位,其推出的专业级显卡凭借顶尖性能与全栈生态优势,成为设计师、数据科学家、科研人员的核心生产力工具。其中,RTX PRO 5000(英伟达Pro 5000)作为专业级显卡家族的核心成员,历经技术迭代升级,衍生出Pro 5000 Blackwell这一革命性产品,依托Blackwell架构的突破性优势,重新定义专业算力边界。而英伟达显卡总代作为连接厂商与终端用户的核心枢纽,承担着产品分销、技术赋能、服务落地的关键职责,让RTX PRO 5000系列的顶尖性能真正渗透至各行业专业场景,构建起“厂商-总代-终端”的完整价值链条。
넶2 2026-03-27 -
NVIDIA DGX Spark:桌面AI超算的革命性突破,重塑端侧专业算力体验
在AI技术向精细化、高效化、端侧化延伸的今天,算力需求呈现出“两极分化”的鲜明特征——一方面,超大规模数据中心依托GPU集群支撑万亿参数大模型训练;另一方面,科研机构、中小企业、专业创作者对“端侧高性能算力”的需求日益迫切,既需要媲美数据中心的算力性能,又要求具备桌面级的便捷性与高性价比。在此背景下,英伟达推出的NVIDIA DGX Spark,作为桌面AI超级计算机(桌面AI超算)领域的标杆产品,彻底打破了“专业算力=大型机房”的固有认知,将高端AI算力浓缩于桌面形态,重新定义了桌面AI超算的核心价值,为端侧专业算力需求提供了最优解。
넶2 2026-03-27 -
四款主流VR设备深度对比:PICO NEO3、PICO 4 Ultra与HTC VIVE Focus Vision、HTC VIVE Cosmos实测解析
随着XR技术的快速迭代,VR设备已从专业领域渗透至消费级市场、企业级应用等多个场景,成为连接虚拟与现实的核心载体。PICO与HTC VIVE作为全球VR行业的两大核心玩家,分别推出了覆盖不同层级、不同场景的代表性产品——PICO NEO3以高性价比打开消费级市场,PICO 4 Ultra主打高端沉浸式体验;HTC VIVE Focus Vision聚焦企业级专业场景,HTC VIVE Cosmos则兼顾消费与入门专业需求。本文将对这四款主流VR设备进行全面拆解,从硬件配置、体验感受、产品定位到适用场景,为用户提供清晰的选择参考,助力不同需求的使用者找到适配自身的VR设备。
넶2 2026-03-27 -
英伟达代理体系深度解析:从NPN到Elite精英代理,构建全栈AI生态赋能之路
在AI算力产业飞速发展的今天,英伟达凭借其在GPU芯片、AI软件及数据中心解决方案领域的绝对优势,成为全球算力生态的核心引领者。而英伟达代理体系作为其生态落地的关键载体,串联起NPN合作伙伴网络、NVAIE认证、NVIDIA AI Enterprise软件套件、数据中心解决方案授权及Elite精英级别代理等核心环节,构建起“厂商-代理-终端用户”的完整价值链条,既实现了英伟达技术与产品的广泛落地,也为合作伙伴提供了多元化的发展机遇,推动AI技术在各行业的规模化渗透。
넶3 2026-03-27 -
算力革命背后的核心支撑:英伟达引领下,GPU集群、AI服务器与算力租赁的协同进化
当生成式AI、大模型训练、自动驾驶等前沿技术进入规模化落地阶段,算力已成为数字经济时代的核心生产要素,如同工业时代的电力般不可或缺。而在这场算力革命中,英伟达凭借其领先的GPU技术,串联起AI服务器、GPU集群与算力租赁产业,构建起从核心硬件到场景服务的完整生态,其中英伟达SuperPOD更是成为超大规模算力交付的标杆,推动整个行业从“量的扩张”向“质的提升”加速转型。
넶2 2026-03-27