最近消费kafka数据到磁盘的时候遇到了这样的问题:

需求:每天大概有1千万条数据,每条数据包含19个字段信息,需要将数据写到服务器磁盘,以第二个字段作为大类建立目录,第7个字段作为小类配合时间戳作为文件名,临时文件后缀tmp,当每个文件的写入条数(可配置,比如100条)达到要求条数时,将后缀tmp改为out。

问题:大类共有30个,小类不计其数而且未知,比如大类为A,小类为a,时间戳为20180606095835234,则A目录下的文件名为20180606095835234_a.tmp,这样一来需要在此文件写满100条时,更新时间戳生成第二个文件名,如果此时有1000个文件都在写则需要有1000个时间戳,和1000个计数器记录每个文件当前的条数,如果分别定义1000个变量显然是不划算的,

尝试:中间过程想到了动态定义变量名,即

定义第七个字段:seven = data.split(\'|\')[7]

定义文件名:filename = time_stamp + \'_\' + seven+\'.tmp\',

定义文件计数器:seven + ‘_num\' = 0

定义文件时间戳:seven + \'_stamp\' = time.time( )

想法其实是没问题的,但是这里用到了一个不常用的语法:用一个变量名和一个字符串拼接出来一个新的变量名,并继续赋值(不知道我的表述是否清楚),试过了用local()函数、global()函数、exec()函数都没有达到预期效果,也许是把问题想的太复杂了

解决:最后使用三个字典将这个问题完美解决,

定义一个字典用来存计数器,字典的每一个键对应一个文件名,值对应当前计数,并实时更新;

定义一个字典用来存时间戳,键对应一个文件名,值对应时间戳,达到100条就更新一次;

定义一个字典用来存大类,键对应代号,值对应分类;

局部功能代码如下:

def kafka_to_disk():
 print(\'启动前检测上次运行时是否存在意外中断的数据文件......\')
 print(\'搜索最近一次执行脚本产生的时间目录......\')
 # 待处理临时文件列表
 tmp_list = []
 try:
  for category_dir in os.listdir(local_file_path):
   if len(os.listdir(local_file_path+os.sep+category_dir)) > 0:
    for file in os.listdir(local_file_path+os.sep+category_dir):
     if suffix in file:
      tmp_list.append(local_file_path+os.sep+category_dir+os.sep+file)
  # print(\'上次运行程序产生的临时文件有---{}\'.format(tmp_list))
 except Exception as e:
  pass
 if len(tmp_list) == 0:
  print(\'未扫描任何残留临时文件\')
 else:
  print(\'开始修复残留临时文件......\')
 tmp_num = 0
 for tmp in tmp_list:
  os.rename(tmp, tmp.split(\'.\')[0]+\'.out\')
  tmp_num += 1
 print(\'本次启动共修复残留临时文件★★★★★-----{}个-----★★★★★\'.format(tmp_num))
 
 category_poor = {
  \'1\': \'news\', \'2\': \'weibo\', \'3\': \'weixin\', \'4\': \'app\', \'5\': \'newspaper\', \'6\': \'luntan\',
  \'7\': \'blog\', \'8\': \'video\', \'9\': \'shangji\', \'10\': \'shangjia\', \'11\': \'gtzy\', \'12\': \'zfztb\',
  \'13\': \'gyfp\', \'14\': \'gjz\', \'15\': \'zfxx\', \'16\': \'ptztb\', \'17\': \'company\', \'18\': \'house\',
  \'19\': \'hospital\', \'20\': \'bank\', \'21\': \'zone\', \'22\': \'express\', \'23\': \'zpgw\', \'24\': \'zscq\',
  \'25\': \'hotel\', \'26\': \'cpws\', \'27\': \'gxqy\', \'28\': \'gpjj\', \'29\': \'dtyy\', \'30\': \'bdbk\'}
 
 time_stamp = utils.get_time_stamp() # 初始化毫秒级时间戳 : 20180509103015125
 consumer = KafkaConsumer(topic, group_id=group_id, auto_offset_reset=auto_offset_reset, bootstrap_servers=eval(bootstrap_servers))
 print(\'连接kafka成功,数据筛选中......\')
 file_poor = {}       # 子类池用于文件计数器
 time_stamp_poor = {}     # 子类时间戳池,用于触发文件切换
 time_stamp = utils.get_time_stamp()  # 初始化毫秒级时间戳 :20180509103015125
 for message in consumer:
  # 提取第8个字段自动匹配目录进行创建
  if message.value.decode().split(\'|\')[1] in category_poor:
   category = category_poor[message.value.decode().split(\'|\')[1]]
  else:
   print(message.value.decode())
   continue
  category_dir = local_file_path + os.sep + category
  if not os.path.exists(category_dir):
   os.makedirs(category_dir)
  # 提取第2个字段,用于生成文件名
  if message.value.decode().split(\'|\')[7] in time_stamp_poor:
   shot_file_name = time_stamp_poor[message.value.decode().split(\'|\')[7]] + \'_\' + message.value.decode().split(\'|\')[7]
  else:
   shot_file_name = time_stamp + \'_\' + message.value.decode().split(\'|\')[7]
  file_name = category_dir + os.sep + shot_file_name + \'.tmp\'
 
  # 给每一个文件设定一个计数器
  if message.value.decode().split(\'|\')[7] not in file_poor:
   file_poor[message.value.decode().split(\'|\')[7]] = 0
 
  with open(file_name, \'a\', encoding=\'utf-8\')as f1:
   f1.write(message.value.decode())
   file_poor[message.value.decode().split(\'|\')[7]] += 1
 
  # 触发切换文件的操作,用时间戳生成第二文件名
  if file_poor[message.value.decode().split(\'|\')[7]] == strip_number:
   time_stamp_poor[message.value.decode().split(\'|\')[7]] = utils.get_time_stamp()
   file_poor[message.value.decode().split(\'|\')[7]] = 0

以上这篇python 解决动态的定义变量名,并给其赋值的方法(大数据处理)就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持脚本之家。

收藏 打印