原理
- 由于ceph分片的特性,使得数据分配均衡就可以认为是pool中osd的pg数目分布大致均衡.
- pgupmap作为ceph12新加入的特性,在osdmap中记录了某个pg的单独映射,客户端计算完crushmap后需要遍历upmap,修改pg和osd的对应关系才能找到正确对应的osd,12版本 新加入的balancer的一个方案就是基于upmap进行均衡的,不过balancer是调节全部的pool,并且pool不能共享osd才能完成调节.
pgupmap举例
形如以下:
pg_upmap_items 4.1 [17,15,8,7] // 对于pg4.1,把应该映射到17的osd改为15,应该映射到8的osd改为7.
pg_upmap_items 4.6 [12,15,8,7]
pg_upmap_items 4.8 [11,6,17,15]
pg_upmap_items 4.a [8,6]
pg_upmap_items 4.b [11,6,16,15]
可以通过ceph osd map dump 查看
调节脚本
场景:可以抽象为N(osd)堆苹果,每堆苹果中含有M个苹果(pg),每次从一堆苹果中拿取一个苹果放入另一堆,直至含有最大数目苹果和最小数目苹果的堆数小于G.
算法原理:简单利用了贪心算法,基于每个osd中的pg数目进行排序,每次从最大的拿一个pg放入最小,然后修改过的osd再插入排序,直至最大和最小值小于G.这样可以保证调节step数最小.
#!/usr/bin/python
import sys
import os
import logging
import commands
import argparse
import time
import json
import collections
#for cephv12.2.x
###user config
gap = 3
targetpools = [\'21\']
execmd = True
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler(\'/var/log/ceph/autoreweight.log\', mode=\'w\')
fh.setLevel(logging.DEBUG)
formatter = logging.Formatter(\"%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s\")
fh.setFormatter(formatter)
logger.addHandler(fh)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)
def getPools():
#get pools
cmd = \"ceph osd pool ls detail | grep ^pool| awk \'{print $2}\' \"
(status,output) = commands.getstatusoutput(cmd)
pools = output.split(\'\\n\')
return pools
def getOsds():
#get osds
cmd = \"ceph osd ls\"
(status,output) = commands.getstatusoutput(cmd)
osds = output.split(\'\\n\')
return osds
def getPoolName(pool):
#get pool name
cmd = \"ceph osd pool stats | grep \"+pool+\"$ | awk \'{print $2}\'\"
(status,poolname) = commands.getstatusoutput(cmd)
return poolname
def getPgNum(poolname):
#get pg_num
cmd = \"ceph osd pool get \"+poolname+\" pg_num -f json\"
(status,output) = commands.getstatusoutput(cmd)
pgnumdict = json.loads(output)
pgnum = pgnumdict[\'pg_num\']
return pgnum
def getPoolSize(poolname):
#get size
cmd = \"ceph osd pool get \"+poolname+\" size -f json\"
(status,output) = commands.getstatusoutput(cmd)
sizedict = json.loads(output)
size = sizedict[\'size\']
return size
def getDefaultGap(pool):
pools = getPools()
osds = getOsds()
res = getPgByPool(pools,osds)
tpool = res[pool]
osdscount = len(tpool)
poolname = getPoolName(pool)
pgnum = getPgNum(poolname)
size = getPoolSize(poolname)
#cal default gap
avgpgperosd = (pgnum*size)/osdscount
if avgpgperosd < 50:
gap = 3
elif avgpgperosd >= 50 and avgpgperosd <=100:
gap = 5
elif avgpgperosd > 100 and avgpgerosd <= 150:
gap = 8
elif avgpgperosd > 150 and avgpgperosd <=200:
gap = 10
elif avgpgperosd > 200:
gap = 15
return gap
def waitAllPgAC():
while True:
cmd = \"ceph pg ls | wc -l\"
(status,output) = commands.getstatusoutput(cmd)
pgnum = str(int(output)-1)
wantstr = pgnum + \" active+clean\"
cmd = \"ceph -s | grep \"+\"\'\"+wantstr+\"\'\"
(status,output) = commands.getstatusoutput(cmd)
if output != \"\":
break
else:
logger.info(\"Wait all pg active+clean,Sleep 5s\")
time.sleep(5)
def getPgByPool(pools,osds):
#wait all pg active + clean
waitAllPgAC()
#res dict
res = {}
for pool in pools:
res[pool] = {}
for osd in osds:
res[pool][osd] = 0
####
for pool in pools:
cmd = \"ceph pg dump | grep \'^\"+pool+\"\\.\'| awk \'{print $15}\'\"
(status,output) = commands.getstatusoutput(cmd)
lines = output.split(\'\\n\')
lines = lines[1:]
for line in lines:
pgosds = line[1:-1].split(\',\')
for pgosd in pgosds:
res[pool][pgosd] += 1
for pool in pools:
print pool
pgsum = 0
for osd in osds:
if res[pool][osd] == 0:
res[pool].pop(osd)
continue
pgsum += res[pool][osd]
print osd+\':\'+str(res[pool][osd])
print \'pgsum:\'+str(pgsum)
return res
def getOsdByHost(osdlist):
res = {}
cmd = \"ceph osd tree -f json\"
(status,output) = commands.getstatusoutput(cmd)
outdict = json.loads(output)
for item in outdict[\'nodes\']:
if item[\'type\'] == \'host\':
todellist = []
for osd in osdlist:
if int(osd[0]) in item[\'children\']:
if not res.has_key(item[\'name\']):
res[item[\'name\']] = []
res[item[\'name\']].append(osd)
todellist.append(osd)
for todel in todellist:
osdlist.remove(todel)
return res
def calcPgUpmap(osdpglist,gap):
osdhostlist = getOsdByHost(osdpglist)
print osdhostlist
steps = {}
for k,v in osdhostlist.items():
while True:
if v[0][1] - v[-1][1] <= gap:
break
else:
start = v.pop(0)
news = (start[0],start[1]-1)
end = v.pop(-1)
newe = (end[0],end[1]+1)
#add newstart to list
for i in range(0,len(v)):
if v[i][1] < news[1]:
v.insert(i,news)
break
#add newend to list
for i in range(0,len(v)):
i = -1*i - 1
if v[i][1] > newe[1]:
if i == -1:
v.append(newe)
else:
v.insert(i+1,newe)
break
if not steps.has_key(k):
steps[k] = []
steps[k].append((start[0],end[0]))
return steps,osdhostlist
def getTargetPg(steps,pool):
res = {}
poolname = getPoolName(pool)
pgnum = getPgNum(poolname)
# add flag in steps,to optimize find time
foundflag = {}
for k in steps.keys():
foundflag[k] = False
for i in range(0,int(pgnum)):
cmd = \"ceph pg map \"+str(pool)+\".\"+str(hex(i))+\"| awk \'{print $8}\'\"
(status,output) = commands.getstatusoutput(cmd)
osdlist = output[1:-1].split(\',\')
for osd in osdlist:
todelhost = []
found = False
for k,v in steps.items():
if foundflag[k]:
continue
for step in v:
if str(osd) == step[0]:
if not res.has_key(str(hex(i))):
res[str(hex(i))] = []
res[str(hex(i))].append(step)
v.remove(step)
found = True
foundflag[k] = True
#print res
break
if len(v) == 0:
todelhost.append(k)
if found:
break
for todel in todelhost:
steps.pop(todel)
#reset foundflag when found next pg
for fk,fv in foundflag.items():
foundflag[fk] = False
#check end
if len(steps) == 0:
break
return res
def reweightOnePool(pool,pgbypool,gap):
pooldict = pgbypool[pool]
maxpgnum = 0
minpgnum = 10000
osdpglist = collections.Counter(pooldict).most_common()
(steps,reslist) = calcPgUpmap(osdpglist,gap)
print steps
print reslist
res = getTargetPg(steps,pool)
return res
def exePgUpmap(pool,steps):
global execmd
print \"global execmd = \",execmd
for k,v in steps.items():
cmd = \"ceph osd pg-upmap-items \"+pool+\".\"+k+\" \"
for item in v:
cmd += item[0]+\" \"+item[1]+\" \"
if execmd:
commands.getstatusoutput(cmd)
else:
print \"Will exe cmd :\",cmd
def reweightByPg(gap,targetpools):
pools = getPools()
osds = getOsds()
res = getPgByPool(pools,osds)
start = time.time()
steps = reweightOnePool(targetpools[0],res,gap)
end = time.time()
print steps
print end - start
exePgUpmap(targetpools[0],steps)
def printPgByPool():
pools = getPools()
osds = getOsds()
res = getPgByPool(pools,osds)
def checkTarPool(targetpools):
pools = getPools()
if targetpools[0] not in pools:
logger.error(\"poolid not in pools\")
sys.exit(-3)
def autoReweight(pool,gap=None):
#In first version targetpools designed as a list,even it only has one element.
try:
targetpools = []
targetpools.append(str(pool))
checkTarPool(targetpools)
if gap == None:
gap = getDefaultGap(targetpools[0])
print targetpools,gap
reweightByPg(gap,targetpools)
except Exception, e:
logger.error(e.message)
if __name__ == \'__main__\':
parser = argparse.ArgumentParser()
parser.add_argument(\'poolid\',type=int,help=\'target pool id to reweight\')
parser.add_argument(\'--gap\',type=int,help=\'the wanted gap between max and min pgs in the pool,default cal according to pgperosd\')
parser.add_argument(\'--execmd\',type=int,choices=[0,1],default=0,help=\'0 for display the upmapcmd ,1 for exe upmapcmd,default 0\')
args = parser.parse_args()
execmd = args.execmd
autoReweight(args.poolid,args.gap)
结束
执行效率比较慢,如果pg数目较多可能需要等一会,根据pg数目的不同应该适当调节gap的取值,如果有更好的方案还望指正.
继续阅读与本文标签相同的文章
下一篇 :
面试 java
-
2019云栖大会 | 开源数据库界大神集体现身,邀你共同感受“开源魅力”
2026-05-18栏目: 教程
-
陈冠希竟然和罗永浩联手了!难不成要搞个锤子?当然不是……
2026-05-18栏目: 教程
-
中国最强快递公司,年入300亿,被称作“哪都通”,但国人都很嫌弃
2026-05-18栏目: 教程
-
原厂直播:ANSYS SI/PI/EMI&TI 2019 R3 新功能介绍
2026-05-18栏目: 教程
-
距离死亡不到3个月 Win7被疯狂攻击
2026-05-18栏目: 教程
