如何通过Python获取暗黑破坏神3战网前1000命位玩家的英雄技能?

如何通过Python获取暗黑破坏神3战网前1000命位玩家的英雄技能?

说实在的个人对游戏并没有多大的兴趣,但唯独对暴雪的Diablo系列很有感情,去年年初开始玩Diablo3,断断续续,感觉最麻烦的是选择技能,每次版本更新可能都有更优的build,这对于我这样的业余玩家来说可不是件好事,好在宏伟秘境后有了天梯,借鉴排名在前的高级玩家们build总没错,于是花了点时间写了这个脚本。

如何通过Python获取暗黑破坏神3战网前1000命位玩家的英雄技能?

脚本只是统计了主动技能、被动技能和传奇宝石的使用情况,理论上统计其它如装备等信息也是一样简单可行的,但Diablo装备的生成机制使得统计这个没有多大意义,相同的装备属性可能各有优劣,难以比较,而且某些装备坑爹的掉率也不是你想要就能有的。

题外话,不得不说Python太适合写这类功能相对简单的脚本了,一个字:快。

# -*- coding: utf-8 -*-
"""
Diablo3 排名前1000玩家英雄使用技能统计

python diablo.py help
python diablo.py [barbarian|crusader|demon-hunter|monk'|witch-doctor|wizard]

默认使用的是亚服的数据,如果需要美服或欧服,更改`_rank_page`和`_api`变量地址即可

Copyright (c) 2015 JinnLynn <eatfishlin@gmail.com>
Released under the terms of the MIT license.
"""
from __future__ import unicode_literals, print_function, absolute_import
import os
import sys
import urllib2
import json
import re

__version__ = '1.0.0'
__author__ = 'JinnLynn <eatfishlin@gmail.com>'
__license__ = 'The MIT License'
__copyright__ = 'Copyright 2015 JinnLynn'

# 排名页面
_rank_page = 'http://tw.battle.net/d3/zh/rankings/'
# api
_api = 'http://tw.battle.net/api/d3/'
_api_profile = os.path.join(_api, 'profile')
_api_data = os.path.join(_api, 'data')

_hero_classes = {
 'barbarian': '野蠻人', 'crusader': '聖教軍', 'demon-hunter': '狩魔獵人',
 'monk': '武僧', 'witch-doctor': '巫醫', 'wizard': '秘術師'}

_retry = 5

_hero_class = ''
_active_skills = {}
_passive_skills = {}
_unique_gems = {}


def _clear_output(msg=''):
 sys.stdout.write('\r{:30}'.format(' '))
 sys.stdout.write('\r{}'.format(msg))
 sys.stdout.flush()


def _process(stated, total):
 msg = '英雄数据分析中... {}/{}'.format(stated, total)
 _clear_output(msg)


def _get(url, is_json=True):
 # print('GET: ', url)
 retry = 5 if _retry < 1 else _retry
 while retry > 0:
  try:
   req = urllib2.urlopen(url.encode('utf8'), timeout=10)
   return json.load(req) if is_json else req.read()
  except KeyboardInterrupt, e:
   raise e
  except Exception, e:
   retry -= 1
   # print('retry', retry, e)
   # raise e


def _api_url(*args, **kwargs):
 slash = kwargs.get('slash', False)
 args = [unicode(arg) for arg in args]
 url = os.path.join(*args).rstrip('/')
 return url + '/' if slash else url


def get_era():
 req = urllib2.urlopen(_rank_page)
 return req.geturl().split('/')[-2]


def get_rank_page_url(era):
 url_part = 'rift-'
 if _hero_class == 'demon-hunter':
  url_part += 'dh'
 elif _hero_class == 'witch-doctor':
  url_part += 'wd'
 else:
  url_part += _hero_class
 return os.path.join(_rank_page, 'era', era, url_part)


def fetch_rank_list():
 tags = []
 try:
  _clear_output('获取当前游戏纪元...')
  era = get_era()
  _clear_output('获取当前排名前1000的玩家...')
  url = get_rank_page_url(era)
  html = _get(url, is_json=False)
  # re parse
  lst = re.findall(
   r"a href=\"(.*)\" title=.*class=\"icon-profile link-first\">",
   html.decode('utf8'),
   re.UNICODE)
  # BeautifulSoup parse
  # import bs4
  # soup = bs4.BeautifulSoup(html)
  # lst = soup.select('#ladders-table tbody tr .battletag a')['href']
  for item in lst:
   try:
    tags.append(item.split('/')[-2])
   except:
    pass
 except Exception, e:
  print('fetch rank list fail. {}'.format(_rank_page))
  raise e
 return tags


def get_hero(player_tag):
 url = _api_url(_api_profile, player_tag, slash=True)
 data = _get(url)
 hero_selected = None
 for hero in data.get('heroes', []):
  if hero['class'] != _hero_class:
   continue
  last_updated = hero_selected['last-updated']
  # 最近使用的英雄
  if hero_selected is None or last_updated < hero['last-updated']:
   hero_selected = hero
 if not hero_selected:
  raise Exception('{} hero missing.'.format(player_tag))
 url = _api_url(_api_profile, player_tag, 'hero', hero_selected['id'])
 return _get(url)


# 主动技能符文
def stat_active_skill_rune(skill_slug, rune):
 global _active_skills
 if not rune:
  return
 slug = rune.get('slug')
 if slug in _active_skills[skill_slug]['rune']:
  _active_skills[skill_slug]['rune'][slug]['count'] += 1
 else:
  _active_skills[skill_slug]['rune'][slug] = {
   'count': 1,
   'name': rune.get('name')
  }


# 主动技能
def stat_active_skill(active):
 global _active_skills
 slug = active.get('skill', {}).get('slug')
 # d3 API 返回的数据中可能存在空的数据
 if not slug:
  return
 if slug in _active_skills:
  _active_skills[slug]['count'] += 1
 else:
  _active_skills[slug] = {
   'count': 1,
   'name': active.get('skill').get('name'),
   'rune': {}
  }
 stat_active_skill_rune(slug, active.get('rune'))


# 被动技能
def stat_passive_skill(passive):
 global _passive_skills
 slug = passive.get('skill', {}).get('slug')
 # d3 API 返回的数据中可能存在空的数据
 if not slug:
  return
 if slug in _passive_skills:
  _passive_skills[slug]['count'] += 1
 else:
  _passive_skills[slug] = {
   'count': 1,
   'name': passive.get('skill').get('name')
  }


def stat_unique_gem(items):
 global _unique_gems

 def get_gem(tooltip):
  if not tooltip:
   return None, None
  url = _api_url(_api_data, tooltip)
  data = _get(url)
  gems = data.get('gems')
  if not gems:
   return None, None
  gem = gems[0].get('item', {})
  return gem.get('id'), gem.get('name')

 if not items:
  return

 lst = [items.get(s, {}) for s in ['leftFinger', 'rightFinger', 'neck']]
 for tooltip in [d.get('tooltipParams', None) for d in lst]:
  id_, name = get_gem(tooltip)
  if not id_:
   continue
  if id_ in _unique_gems:
   _unique_gems[id_]['count'] += 1
  else:
   _unique_gems[id_] = {
    'count': 1,
    'name': name
   }


def stat(hero):
 global _active_skills, _passive_skills

 map(stat_active_skill, hero.get('skills', {}).get('active', []))
 map(stat_passive_skill, hero.get('skills', {}).get('passive', []))

 items = hero.get('items', {})
 stat_unique_gem(items)


def output(hero_stated, hero_stat_failed):
 def sort(data, count=10):
  d = sorted(data.items(), key=lambda d: d[1]['count'], reverse=True)
  return d if count <= 0 else d[0:count]

 _clear_output()

 # print('======')
 # print(hero_stated, hero_stat_failed)
 # print('======')
 # pprint(_active_skills)
 # print('======')
 # pprint(_passive_skills)
 # print('======')
 # pprint(_unique_gems)
 # pprint(_active_skills.items())
 # print('======')

 print('\n=== RESULT ===\n')
 print('统计英雄数\n')
 print(' 成功: {} 失败: {}\n'.format(hero_stated, hero_stat_failed))

 print('主动技能使用排名: ')
 for _, d in sort(_active_skills):
  runes = []
  for _, r in sort(d.get('rune', {})):
   runes.append('{name}[{count}]'.format(**r))
  d.update({'rune_rank': ', '.join(runes)})
  print(' {name}[{count}]: {rune_rank}'.format(**d))
 print()

 print('被动技能使用排名: ')
 for _, d in sort(_passive_skills):
  print(' {name}[{count}]'.format(**d))
 print()

 print('传奇宝石使用排名: ')
 for _, d in sort(_unique_gems):
  print(' {name}[{count}]'.format(**d))
 print()


def prepare():
 global _hero_class

 def print_hc():
  print('仅支持以下英雄类型, 默认 demon-hunter:\n')
  for c, n in _hero_classes.items():
   print(c, ':', n)

 if len(sys.argv) == 1:
  _hero_class = 'demon-hunter'
 elif len(sys.argv) > 2:
  sys.exit('参数错误')
 else:
  arg = sys.argv[1]
  if arg == 'help':
   print_hc()
   print('\nTips: 运行中可随时Ctrl+C终止以获得已统计的数据结果')
   sys.exit()
  elif arg not in _hero_classes:
   print_hc()
   sys.exit()
  else:
   _hero_class = arg


def main():
 prepare()
 print('待分析的英雄类型:', _hero_classes[_hero_class])

 hero_stated = 0
 hero_stat_failed = 0
 try:
  tags = fetch_rank_list()
  if not tags:
   raise Exception('parse battle.net rank page fail.')
 except Exception, e:
  print('error,', e)
  sys.exit()

 total = len(tags)

 for tag in tags:
  try:
   hero = get_hero(tag)
   if not hero:
    raise Exception('no hero data')
   stat(hero)
   hero_stated += 1
   _process(hero_stated, total)
  except KeyboardInterrupt:
   break
  except Exception, e:
   # print('Fail: ', tag, e, hero)
   hero_stat_failed += 1

 output(hero_stated, hero_stat_failed)


if __name__ == '__main__':
 main()