使用Python抓取2019-nCov疫情数据
这次的疫情想尽让大家过了一个此生无法忘记的春节,作为一名半个武汉人,今年的春节实在过于沉重,而作为一名GISer,可能咱没那能力去一线帮点什么,我就想办法把这次的数据都收集起来,当个练手的工具,把所学用到现实中去,看看你所学的那些各种 GIS 中的方法论能不能应用到实际场景中,并发挥一些价值。
可巧妇难为无米之炊,没有数据的GISer就像没有子弹的士兵,就是有一股劲儿,也使不出来。数据不会从天上飞下来,要枪要炮还是得自己造。于是看到丁香园和腾讯的官方疫情通知,想能不能找个方法直接抓下来。
PS:如果你仅仅需要数据,请跳过下面关于代码的讨论。
站在巨人的肩膀
在造轮子之前,有个习惯,看看有没有已经做过的,一搜不要紧,还真有。把代码clone下来,跑了一下,结果比较OK的,下面介绍给大家。
抓取腾讯疫情数据
Github Repo:https://github.com/dakula009/China_CoronaVirus_Data_Miner
clone 下来的核心代码也比较简单,通俗易懂。
# —*— coding: utf-8 —*—
import requests
import json
import time
import pandas as pd
# 请求的URL
url = 'https://view.inews.qq.com/g2/getOnsInfo?name=disease_h5&callback=&_=%d'
# 伪装请求头
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36',
'referer': 'https://news.qq.com/zt2020/page/feiyan.htm?from=timeline&isappinstalled=0'
}
# 抓取数据
r = requests.get(url % time.time(), headers=headers)
data = json.loads(r.text)
data = json.loads(data['data'])
lastUpdateTime = data['lastUpdateTime']
print('数据更新时间 ' + str(lastUpdateTime))
areaTree = data['areaTree']
# 创建空 dataframe
col_names = ['省', '市', '确认' , '死亡', '治愈']
my_df = pd.DataFrame(columns = col_names)
for item in areaTree:
if item['name'] == '中国':
item_ps = item['children']
for item_p in item_ps:
province = item_p['name']
# print(province)
item_cs = item_p['children']
for item_c in item_cs:
prefecture = item_c['name']
# print(' ' + prefecture)
# print(' ' + str(item_c['total']))
confirm = item_c['total']['confirm']
# suspect = item_c['total']['suspect']
death = item_c['total']['dead']
heal = item_c['total']['heal']
# 向df添加数据
data_dict = {'省': province, '市':prefecture, '确认': confirm, '死亡': death, '治愈': heal}
my_df.loc[len(my_df)] = data_dict
# 保存数据
my_df.to_csv(r'./china_status_{}.csv'.format(str(lastUpdateTime).split()[0]), encoding='utf_8_sig', header='true')
print('Success')
代码写的比较简洁,大家可以参考,注释也写得很清楚。数据格式:
抓取丁香园疫情数据
Github Repo:https://github.com/BlankerL/DXY-2019-nCoV-Crawler
这个代码写得比较完善,使用的是Python3,需要本地安装Mongo,有条件的可以下载下来研究。核心代码不到200行:
"""
@ProjectName: DXY-2019-nCov-Crawler
@FileName: crawler.py
@Author: Jiabao Lin
@Date: 2020/1/21
"""
from bs4 import BeautifulSoup
from service.db import DB
from service.countryTypeMap import country_type
import re
import json
import time
import logging
import datetime
import requests
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger(__name__)
headers = {
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.117 Safari/537.36'
}
class Crawler:
def __init__(self):
self.session = requests.session()
self.session.headers.update(headers)
self.db = DB()
self.crawl_timestamp = int()
def run(self):
while True:
self.crawler()
time.sleep(60)
def crawler(self):
while True:
self.crawl_timestamp = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
try:
r = self.session.get(url='https://3g.dxy.cn/newh5/view/pneumonia')
except requests.exceptions.ChunkedEncodingError:
continue
soup = BeautifulSoup(r.content, 'lxml')
overall_information = re.search(r'\{("id".*?)\]\}', str(soup.find('script', attrs={'id': 'getStatisticsService'})))
province_information = re.search(r'\[(.*?)\]', str(soup.find('script', attrs={'id': 'getListByCountryTypeService1'})))
area_information = re.search(r'\[(.*)\]', str(soup.find('script', attrs={'id': 'getAreaStat'})))
abroad_information = re.search(r'\[(.*)\]', str(soup.find('script', attrs={'id': 'getListByCountryTypeService2'})))
news = re.search(r'\[(.*?)\]', str(soup.find('script', attrs={'id': 'getTimelineService'})))
if not overall_information or not province_information or not area_information or not news:
continue
self.overall_parser(overall_information=overall_information)
self.province_parser(province_information=province_information)
self.area_parser(area_information=area_information)
self.abroad_parser(abroad_information=abroad_information)
self.news_parser(news=news)
break
while True:
self.crawl_timestamp = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
try:
r = self.session.get(url='https://file1.dxycdn.com/2020/0127/797/3393185293879908067-115.json')
except requests.exceptions.ChunkedEncodingError:
continue
# Use try-except to ensure the .json() method will not raise exception.
try:
if r.status_code != 200:
continue
elif r.json().get('code') == 'success':
self.rumor_parser(rumors=r.json().get('data'))
break
else:
continue
except json.decoder.JSONDecodeError:
continue
logger.info('Successfully crawled.')
def overall_parser(self, overall_information):
overall_information = json.loads(overall_information.group(0))
overall_information.pop('id')
overall_information.pop('createTime')
overall_information.pop('modifyTime')
overall_information.pop('imgUrl')
overall_information.pop('deleted')
overall_information['countRemark'] = overall_information['countRemark'].replace(' 疑似', ',疑似').replace(' 治愈', ',治愈').replace(' 死亡', ',死亡').replace(' ', '')
if not self.db.find_one(collection='DXYOverall', data=overall_information):
overall_information['updateTime'] = self.crawl_timestamp
self.db.insert(collection='DXYOverall', data=overall_information)
def province_parser(self, province_information):
provinces = json.loads(province_information.group(0))
for province in provinces:
province.pop('id')
province.pop('tags')
province.pop('sort')
province['comment'] = province['comment'].replace(' ', '')
if self.db.find_one(collection='DXYProvince', data=province):
continue
province['crawlTime'] = self.crawl_timestamp
province['country'] = country_type.get(province['countryType'])
self.db.insert(collection='DXYProvince', data=province)
def area_parser(self, area_information):
area_information = json.loads(area_information.group(0))
for area in area_information:
area['comment'] = area['comment'].replace(' ', '')
if self.db.find_one(collection='DXYArea', data=area):
continue
area['country'] = '中国'
area['updateTime'] = self.crawl_timestamp
self.db.insert(collection='DXYArea', data=area)
def abroad_parser(self, abroad_information):
countries = json.loads(abroad_information.group(0))
for country in countries:
country.pop('id')
country.pop('tags')
country.pop('countryType')
country.pop('provinceId')
country['country'] = country.get('provinceName')
country['provinceShortName'] = country.get('provinceName')
country.pop('cityName')
country.pop('sort')
country['comment'] = country['comment'].replace(' ', '')
if self.db.find_one(collection='DXYArea', data=country):
continue
country['updateTime'] = self.crawl_timestamp
self.db.insert(collection='DXYArea', data=country)
def news_parser(self, news):
news = json.loads(news.group(0))
for _news in news:
_news.pop('pubDateStr')
if self.db.find_one(collection='DXYNews', data=_news):
continue
_news['crawlTime'] = self.crawl_timestamp
self.db.insert(collection='DXYNews', data=_news)
def rumor_parser(self, rumors):
for rumor in rumors:
rumor.pop('score')
rumor['body'] = rumor['body'].replace(' ', '')
if self.db.find_one(collection='DXYRumors', data=rumor):
continue
rumor['crawlTime'] = self.crawl_timestamp
self.db.insert(collection='DXYRumors', data=rumor)
if __name__ == '__main__':
crawler = Crawler()
crawler.run()
数据长这样:
数据说明:
provinceName,cityName,province_confirmedCount,province_suspectedCount,province_curedCount,province_deadCount,city_confirmedCount,city_suspectedCount,city_curedCount,city_deadCount,updateTime
数据下载
如果你运行了上面的方案,发现只能获取当天的数据,但如果想用时态的数据怎么办?另外如果没有学过Python,或者说没有条件去安装相关环境怎么办?
其实已经有人把这个工作做好了,做成了一个csv文件,并且会每天更新,你说贴心不?
数据地址:https://github.com/BlankerL/DXY-2019-nCoV-Data
里面有 csv
和 json
格式的,大家可以按需自取。
相关阅读
声明
1.本文所分享的所有需要用户下载使用的内容(包括但不限于软件、数据、图片)来自于网络或者麻辣GIS粉丝自行分享,版权归该下载资源的合法拥有者所有,如有侵权请第一时间联系本站删除。
2.下载内容仅限个人学习使用,请切勿用作商用等其他用途,否则后果自负。
你好,我发现链接中的历史数据好像是2月5日开始的,还有更早的吗?或者怎么样能获取更早的数据呢?谢谢
不是的,最早的是1.24的,仔细看。
DXY-2019-nCoV-Data项目用Python怎么获取数据 我这边老出问题,求指教!
报了什么错?
在那个项目里面 有个script.py文件我也下载了里面相应的模块可是运行这一句from github3 import login就会出错 ImportError: cannot import name 'login' from 'github3' (D:\Users\hp\anaconda3\lib\site-packages\github3\__init__.py) 找了很久没有解决办法
我看了一下,人家整个Repo里都没有script.py这个文件啊?
我最近在写论文 需要这个数据^0^