pythonにて並列処理をするプログラムを作る機会がありました
いつもはthreadingというモジュールで並列処理をしておりプログラムもそのモジュールにて組みました
しかし、調べてみると最近はconcurrent.futuresというモジュールでやるのが良いということを知りました
なので勉強がてらやってみたのメモとして書いておきます
concurrent.futuresとは
Python3.2で追加された並列処理をするためのライブラリです
“ThreadPoolExecutor”とすればスレッドで並列処理を行います
“ProcessPoolExecutor”とすれば別々のプロセスで処理を行います
そのため、ひとつのモジュールをインポートすればスレッドとプロセスのどちらかで並列処理をしてくれます(個別でThreadingやmultiprocessingをインポートする必要がなくなった)
参考:https://docs.python.jp/3/library/concurrent.futures.html
プログラム
並列処理をするにあたり、以前に作成したWebスクレイピングするプログラムをベースに作成しました
tenki.jpとウェザーニュースの2つの情報をスレッドの並列処理でとってくるプログラムを作りました
参考:


また参考として逐次で天気情報を取得するプログラムも追加し、速度検証をしてみました
#!/usr/bin/python
# -*- Coding: utf-8 -*-
from concurrent import futures
import requests
import time
from bs4 import BeautifulSoup
#緯度経度を取得
def Location(address):
Url = "https://www.geocoding.jp/api/"
Params = {"q":address}
Req = requests.get(Url, params=Params)
Soup = BeautifulSoup(Req.text, 'lxml')
return Soup.find("lat").text, Soup.find("lng").text
#WeatherNewsの取得
def WeatherNews(address):
Url = "https://weathernews.jp/onebox/"
#緯度経度を取得
(Lat, Lng) = Location(address)
SrhUrl = Url + Lat + '/' + Lng + "/lang=ja"
Req = requests.get(SrhUrl)
Soup = BeautifulSoup(Req.text, 'lxml')
#Dict
myDict = {}
#天気
myDict["天気"] = Soup.find(class_="sub").text.split(", ")[1]
#気温
myDict["気温"] = Soup.find(class_="obs_temp_main").text
#風速とかの情報
for val in Soup.find(class_="table-obs_sub").find_all("tr"):
myDict[val.find(class_="obs_sub_title").text] = val.find(class_="obs_sub_value").text.replace(" : ","")
return myDict
#Tenkiの取得
def Tenki(address):
Url = "https://tenki.jp"
Req = requests.get(Url + "/search/?keyword=" + address)
Soup = BeautifulSoup(Req.text, 'lxml')
Sed = Soup.find_all(class_="search-entry-data")
HrfUrl = None
for val in Sed:
if val.find(class_="address").text.find("以下に掲載がない場合"):
HrfUrl = val.a.get("href")
break
myDict = {}
#住所からhrefを取得
if not(HrfUrl is None):
Req = requests.get(Url + HrfUrl)
Soup = BeautifulSoup(Req.text, 'lxml')
TodaySoup = Soup.find(class_="today-weather")
#気温(最高)
myDict["気温(最高)"] = TodaySoup.find(class_="weather-wrap").find(class_="high-temp temp").find(class_="value").text + TodaySoup.find(class_="weather-wrap").find(class_="high-temp tempdiff").text
#気温(最低)
myDict["気温(最低)"] = TodaySoup.find(class_="weather-wrap").find(class_="low-temp temp").find(class_="value").text + TodaySoup.find(class_="weather-wrap").find(class_="low-temp tempdiff").text
#天気
myDict["天気"] = TodaySoup.find(class_="weather-wrap").find(class_="weather-telop").text
#風
myDict["風"] = TodaySoup.find(class_="wind-wave").find("td").text
return myDict
def main(address):
print("Start No Thread Process")
start = time.time()
print(WeatherNews(address))
print(Tenki(address))
end = time.time()
print('time = {}'.format(end - start))
print("======")
time.sleep(10) #apiのcallのためのwait
print("Start Thread Process")
start = time.time()
myThreads = list()
with futures.ThreadPoolExecutor() as executor:
#Thread化
myThreads.append(executor.submit(WeatherNews, address))
myThreads.append(executor.submit(Tenki, address))
#結果を出力
for myThread in myThreads:
print(myThread.result())
end = time.time()
print('time = {}'.format(end - start))
if __name__ == '__main__':
main("石川県加賀市")
実行結果
約0.6秒くらい早くなりました
2つのサイトをスクレイピングしているだけなのでThreadしてもしなくても結果はかわりませんが、複数だったら大きく変わると思います
Start No Thread Process
{'天気': 'くもり', '気温': '24.1', '湿度': '70 %', '気圧': '1022 hPa', '風': '北北東 1 m/s', '日の出': '05:53', '日の入': '17:34'}
{'気温(最高)': '24[0]', '気温(最低)': '18[+3]', '天気': '曇', '風': '北の風後北東の風'}
time = 3.6397287845611572
======
Start Thread Process
{'天気': 'くもり', '気温': '24.1', '湿度': '70 %', '気圧': '1022 hPa', '風': '北北東 1 m/s', '日の出': '05:53', '日の入': '17:34'}
{'気温(最高)': '24[0]', '気温(最低)': '18[+3]', '天気': '曇', '風': '北の風後北東の風'}
time = 2.95491361618042
プログラムの解説
スクレイピング方法は前の記事にて書いたので省きます
重要なところは下記のところです
myThreads = list()
with futures.ThreadPoolExecutor() as executor:
#Thread化
myThreads.append(executor.submit(WeatherNews, address))
myThreads.append(executor.submit(Tenki, address))
#結果を出力
for myThread in myThreads:
print(myThread.result())
myThreadというリスト(配列)をつくり、その中にスレッド(executor.submit)を入れています
定義の仕方はexecutor.submit(スレッド化させる関数,引数)となっています
最後にfor文にて結果(myThread.result())を出力しています
またwith futures.ThreadPoolExecutor() as executor:のところを”with futures.ProcessPoolExecutor()“とするとプロセスで動きます
おまけプログラム
上記のプログラムは、“executor.submit”で逐次スレッド化するものを追加していました
しかしexecutor.mapを使うと一括でスレッド化するものを追加できます
おまけとして、myWaitTimeというリストのなかに1~10の値をいれ、.mapで一括追加して動かしてみました
#!/usr/bin/python
# -*- Coding: utf-8 -*-
import time
from concurrent import futures
def myWait(argv):
time.sleep(argv)
print("finish:", str(argv) + "sec")
def main():
myWaitTime = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
start = time.time()
with futures.ThreadPoolExecutor() as executor:
executor.map(myWait, myWaitTime)
end = time.time()
print('time = {}'.format(end - start))
if __name__ == '__main__':
main()
結果
finish: 1sec finish: 2sec finish: 3sec finish: 4sec finish: 5sec finish: 6sec finish: 7sec finish: 8sec finish: 9sec finish: 10sec time = 10.004080295562744
引数をリストで渡してしまえばおkなのところは便利です