掃二維碼與項目經(jīng)理溝通
我們在微信上24小時期待你的聲音
解答本文疑問/技術咨詢/運營咨詢/技術建議/互聯(lián)網(wǎng)交流
原創(chuàng)
作者:葉泳豪 2018-05-09 09:44:51
開發(fā)
后端
開發(fā)工具
分布式 在不用爬蟲框架的情況下,我經(jīng)過多方學習,嘗試實現(xiàn)了一個分布式爬蟲系統(tǒng),并且可以將數(shù)據(jù)保存到不同地方,類似 MySQL、HBase 等。

專注于為中小企業(yè)提供成都網(wǎng)站制作、成都網(wǎng)站設計服務,電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)南溪免費做網(wǎng)站提供優(yōu)質的服務。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了上1000家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設實現(xiàn)規(guī)模擴充和轉變。
【51CTO.com原創(chuàng)稿件】在不用爬蟲框架的情況下,我經(jīng)過多方學習,嘗試實現(xiàn)了一個分布式爬蟲系統(tǒng),并且可以將數(shù)據(jù)保存到不同地方,類似 MySQL、HBase 等。
因為此系統(tǒng)基于面向接口的編碼思想來開發(fā),所以具有一定的擴展性,有興趣的朋友直接看一下代碼,就能理解其設計思想。
雖然代碼目前來說很多地方還是比較緊耦合,但只要花些時間和精力,很多都是可抽取出來并且可配置化的。
因為時間的關系,我只寫了京東和蘇寧易購兩個網(wǎng)站的爬蟲,但是完全可以實現(xiàn)不同網(wǎng)站爬蟲的隨機調度,基于其代碼結構,再寫國美、天貓等的商品爬取,難度不大,但是估計需要花些時間和精力。
因為在解析網(wǎng)頁的數(shù)據(jù)時,比如我在爬取蘇寧易購商品的價格時,價格是異步獲取的,并且其 API 是一長串的數(shù)字組合,我花了幾個小時的時間才發(fā)現(xiàn)其規(guī)律,當然也承認,我的經(jīng)驗不足。
這個系統(tǒng)的設計,除了基本的數(shù)據(jù)爬取以外,更關注以下幾個方面的問題:
下面會針對這個系統(tǒng)來做一個整體的基本介紹,我在代碼中都有非常詳細的注釋,有興趣的朋友可以參考一下代碼,***我會給出一些我爬蟲時的數(shù)據(jù)分析。
另外需要注意的是,這個爬蟲系統(tǒng)是基于 Java 實現(xiàn)的,但是語言本身仍然不是最重要的,有興趣的朋友可以嘗試用 Python 實現(xiàn)。
分布式爬蟲系統(tǒng)架構
整體系統(tǒng)架構如下:
從上面的架構可以看出,整個系統(tǒng)主要分為三個部分:
爬蟲系統(tǒng)是用來爬取數(shù)據(jù)的,因為系統(tǒng)設計為分布式,因此,爬蟲程序本身可以運行在不同的服務器節(jié)點上。
URL 調度系統(tǒng)核心在于 URL 倉庫,所謂的 URL 倉庫其實就是用 Redis 保存了需要爬取的 URL 列表,并且在我們的 URL 調度器中根據(jù)一定的策略來消費其中的 URL。從這個角度考慮,URL 倉庫其實也是一個 URL 隊列。
監(jiān)控報警系統(tǒng)主要是對爬蟲節(jié)點進行監(jiān)控,雖然并行執(zhí)行的爬蟲節(jié)點中的某一個掛掉了對整體數(shù)據(jù)爬取本身沒有影響(只是降低了爬蟲的速度),但是我們還是希望能夠主動接收到節(jié)點掛掉的通知,而不是被動地發(fā)現(xiàn)。
下面將針對以上三個方面并結合部分代碼片段來對整個系統(tǒng)的設計思路做一些基本的介紹。
爬蟲系統(tǒng)
爬蟲系統(tǒng)是一個獨立運行的進程,我們把我們的爬蟲系統(tǒng)打包成 jar 包,然后分發(fā)到不同的節(jié)點上執(zhí)行,這樣并行爬取數(shù)據(jù)可以提高爬蟲的效率。(說明:ZooKeeper 監(jiān)控屬于監(jiān)控報警系統(tǒng),URL 調度器屬于 URL 調度系統(tǒng))
隨機 IP 代理器
加入隨機 IP 代理主要是為了反反爬蟲,因此如果有一個 IP 代理庫,并且可以在構建 http 客戶端時隨機地使用不同的代理,那么對我們進行反反爬蟲會有很大的幫助。
在系統(tǒng)中使用 IP 代理庫,需要先在文本文件中添加可用的代理地址信息:
- # IPProxyRepository.txt
- 58.60.255.104:8118
- 219.135.164.245:3128
- 27.44.171.27:9999
- 219.135.164.245:3128
- 58.60.255.104:8118
- 58.252.6.165:9000
- ......
需要注意的是,上面的代理 IP 是我在西刺代理上拿到的一些代理 IP,不一定可用,建議是自己花錢購買一批代理 IP,這樣可以節(jié)省很多時間和精力去尋找代理 IP。
然后在構建 http 客戶端的工具類中,當***次使用工具類時,會把這些代理 IP 加載進內存中,加載到 Java 的一個 HashMap:
- // IP地址代理庫Map
- private static Map
IPProxyRepository = new HashMap<>(); - private static String[] keysArray = null; // keysArray是為了方便生成隨機的代理對象
- /**
- * 初次使用時使用靜態(tài)代碼塊將IP代理庫加載進set中
- */
- static {
- InputStream in = HttpUtil.class.getClassLoader().getResourceAsStream("IPProxyRepository.txt"); // 加載包含代理IP的文本
- // 構建緩沖流對象
- InputStreamReader isr = new InputStreamReader(in);
- BufferedReader bfr = new BufferedReader(isr);
- String line = null;
- try {
- // 循環(huán)讀每一行,添加進map中
- while ((line = bfr.readLine()) != null) {
- String[] split = line.split(":"); // 以:作為分隔符,即文本中的數(shù)據(jù)格式應為192.168.1.1:4893
- String host = split[0];
- int port = Integer.valueOf(split[1]);
- IPProxyRepository.put(host, port);
- }
- Set
keys = IPProxyRepository.keySet(); - keysArray = keys.toArray(new String[keys.size()]); // keysArray是為了方便生成隨機的代理對象
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
之后,在每次構建 http 客戶端時,都會先到 map 中看是否有代理 IP,有則使用,沒有則不使用代理:
- CloseableHttpClient httpClient = null;
- HttpHost proxy = null;
- if (IPProxyRepository.size() > 0) { // 如果ip代理地址庫不為空,則設置代理
- proxy = getRandomProxy();
- httpClient = HttpClients.custom().setProxy(proxy).build(); // 創(chuàng)建httpclient對象
- } else {
- httpClient = HttpClients.custom().build(); // 創(chuàng)建httpclient對象
- }
- HttpGet request = new HttpGet(url); // 構建htttp get請求
- ......
隨機代理對象則通過下面的方法生成:
- /**
- * 隨機返回一個代理對象
- *
- * @return
- */
- public static HttpHost getRandomProxy() {
- // 隨機獲取host:port,并構建代理對象
- Random random = new Random();
- String host = keysArray[random.nextInt(keysArray.length)];
- int port = IPProxyRepository.get(host);
- HttpHost proxy = new HttpHost(host, port); // 設置http代理
- return proxy;
- }
這樣,通過上面的設計,基本就實現(xiàn)了隨機 IP 代理器的功能,當然,其中還有很多可以完善的地方。
比如,當使用這個 IP 代理而請求失敗時,是否可以把這一情況記錄下來;當超過一定次數(shù)時,再將其從代理庫中刪除,同時生成日志供開發(fā)人員或運維人員參考,這是完全可以實現(xiàn)的,不過我就不做這一步功能了。
網(wǎng)頁下載器
網(wǎng)頁下載器就是用來下載網(wǎng)頁中的數(shù)據(jù),主要基于下面的接口開發(fā):
- /**
- * 網(wǎng)頁數(shù)據(jù)下載
- */
- public interface IDownload {
- /**
- * 下載給定url的網(wǎng)頁數(shù)據(jù)
- * @param url
- * @return
- */
- public Page download(String url);
- }
基于此,在系統(tǒng)中只實現(xiàn)了一個 http get 的下載器,但是也可以完成我們所需要的功能了:
- /**
- * 數(shù)據(jù)下載實現(xiàn)類
- */
- public class HttpGetDownloadImpl implements IDownload {
- @Override
- public Page download(String url) {
- Page page = new Page();
- String content = HttpUtil.getHttpContent(url); // 獲取網(wǎng)頁數(shù)據(jù)
- page.setUrl(url);
- page.setContent(content);
- return page;
- }
- }
網(wǎng)頁解析器
網(wǎng)頁解析器就是把下載的網(wǎng)頁中我們感興趣的數(shù)據(jù)解析出來,并保存到某個對象中,供數(shù)據(jù)存儲器進一步處理以保存到不同的持久化倉庫中,其基于下面的接口進行開發(fā):
- /**
- * 網(wǎng)頁數(shù)據(jù)解析
- */
- public interface IParser {
- public void parser(Page page);
- }
網(wǎng)頁解析器在整個系統(tǒng)的開發(fā)中也算是比較重頭戲的一個組件,功能不復雜,主要是代碼比較多,針對不同的商城不同的商品,對應的解析器可能就不一樣了。
因此需要針對特別的商城的商品進行開發(fā),因為很顯然,京東用的網(wǎng)頁模板跟蘇寧易購的肯定不一樣,天貓用的跟京東用的也肯定不一樣。
所以這個完全是看自己的需要來進行開發(fā)了,只是說,在解析器開發(fā)的過程當中會發(fā)現(xiàn)有部分重復代碼,這時就可以把這些代碼抽象出來開發(fā)一個工具類了。
目前在系統(tǒng)中爬取的是京東和蘇寧易購的手機商品數(shù)據(jù),因此就寫了這兩個實現(xiàn)類:
- /**
- * 解析京東商品的實現(xiàn)類
- */
- public class JDHtmlParserImpl implements IParser {
- ......
- }
- /**
- * 蘇寧易購網(wǎng)頁解析
- */
- public class SNHtmlParserImpl implements IParser {
- ......
- }
數(shù)據(jù)存儲器
數(shù)據(jù)存儲器主要是將網(wǎng)頁解析器解析出來的數(shù)據(jù)對象保存到不同的表格,而對于本次爬取的手機商品,數(shù)據(jù)對象是下面一個 Page 對象:
- /**
- * 網(wǎng)頁對象,主要包含網(wǎng)頁內容和商品數(shù)據(jù)
- */
- public class Page {
- private String content; // 網(wǎng)頁內容
- private String id; // 商品Id
- private String source; // 商品來源
- private String brand; // 商品品牌
- private String title; // 商品標題
- private float price; // 商品價格
- private int commentCount; // 商品評論數(shù)
- private String url; // 商品地址
- private String imgUrl; // 商品圖片地址
- private String params; // 商品規(guī)格參數(shù)
- private List
urls = new ArrayList<>(); // 解析列表頁面時用來保存解析的商品url的容器 - }
對應的,在 MySQL 中,表數(shù)據(jù)結構如下:
- -- ----------------------------
- -- Table structure for phone
- -- ----------------------------
- DROP TABLE IF EXISTS `phone`;
- CREATE TABLE `phone` (
- `id` varchar(30) CHARACTER SET armscii8 NOT NULL COMMENT '商品id',
- `source` varchar(30) NOT NULL COMMENT '商品來源,如jd suning gome等',
- `brand` varchar(30) DEFAULT NULL COMMENT '手機品牌',
- `title` varchar(255) DEFAULT NULL COMMENT '商品頁面的手機標題',
- `price` float(10,2) DEFAULT NULL COMMENT '手機價格',
- `comment_count` varchar(30) DEFAULT NULL COMMENT '手機評論',
- `url` varchar(500) DEFAULT NULL COMMENT '手機詳細信息地址',
- `img_url` varchar(500) DEFAULT NULL COMMENT '圖片地址',
- `params` text COMMENT '手機參數(shù),json格式存儲',
- PRIMARY KEY (`id`,`source`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
而在 HBase 中的表結構則為如下:
- ## cf1 存儲 id source price comment brand url
- ## cf2 存儲 title params imgUrl
- create 'phone', 'cf1', 'cf2'
- ## 在HBase shell中查看創(chuàng)建的表
- hbase(main):135:0> desc 'phone'
- Table phone is ENABLED
- phone
- COLUMN FAMILIES DESCRIPTION
- {NAME => 'cf1', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK
- _ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE =>
- '65536', REPLICATION_SCOPE => '0'}
- {NAME => 'cf2', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK
- _ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE =>
- '65536', REPLICATION_SCOPE => '0'}
- 2 row(s) in 0.0350 seconds
即在 HBase 中建立了兩個列族,分別為 cf1、cf2,其中 cf1 用來保存 id source price comment brand url 字段信息;cf2 用來保存 title params imgUrl 字段信息。
不同的數(shù)據(jù)存儲用的是不同的實現(xiàn)類,但是其都是基于下面同一個接口開發(fā)的:
- /**
- * 商品數(shù)據(jù)的存儲
- */
- public interface IStore {
- public void store(Page page);
- }
然后基于此開發(fā)了 MySQL 的存儲實現(xiàn)類、HBase 的存儲實現(xiàn)類還有控制臺的輸出實現(xiàn)類,如 MySQL 的存儲實現(xiàn)類,其實就是簡單的數(shù)據(jù)插入語句:
- /**
- * 使用dbc數(shù)據(jù)庫連接池將數(shù)據(jù)寫入mysql表中
- */
- public class MySQLStoreImpl implements IStore {
- private QueryRunner queryRunner = new QueryRunner(DBCPUtil.getDataSource());
- @Override
- public void store(Page page) {
- String sql = "insert into phone(id, source, brand, title, price, comment_count, url, img_url, params) values(?, ?, ?, ?, ?, ?, ?, ?, ?)";
- try {
- queryRunner.update(sql, page.getId(),
- page.getSource(),
- page.getBrand(),
- page.getTitle(),
- page.getPrice(),
- page.getCommentCount(),
- page.getUrl(),
- page.getImgUrl(),
- page.getParams());
- } catch (SQLException e) {
- e.printStackTrace();
- }
- }
- }
而 HBase 的存儲實現(xiàn)類,則是 HBase Java API 的常用插入語句代碼:
- ......
- // cf1:price
- Put pricePut = new Put(rowKey);
- // 必須要做是否為null判斷,否則會有空指針異常
- pricePut.addColumn(cf1, "price".getBytes(), page.getPrice() != null ? String.valueOf(page.getPrice()).getBytes() : "".getBytes());
- puts.add(pricePut);
- // cf1:comment
- Put commentPut = new Put(rowKey);
- commentPut.addColumn(cf1, "comment".getBytes(), page.getCommentCount() != null ? String.valueOf(page.getCommentCount()).getBytes() : "".getBytes());
- puts.add(commentPut);
- // cf1:brand
- Put brandPut = new Put(rowKey);
- brandPut.addColumn(cf1, "brand".getBytes(), page.getBrand() != null ? page.getBrand().getBytes() : "".getBytes());
- puts.add(brandPut);
- ......
當然,至于要將數(shù)據(jù)存儲在哪個地方,在初始化爬蟲程序時,是可以手動選擇的:
- // 3.注入存儲器
- iSpider.setStore(new HBaseStoreImpl());
目前還沒有把代碼寫成可以同時存儲在多個地方,按照目前代碼的架構,要實現(xiàn)這一點也比較簡單,修改一下相應代碼就好了。
實際上,是可以先把數(shù)據(jù)保存到 MySQL 中,然后通過 Sqoop 導入到 HBase 中,詳細操作可以參考我寫的 Sqoop 文章。
仍然需要注意的是,如果確定需要將數(shù)據(jù)保存到 HBase 中,請保證你有可用的集群環(huán)境,并且需要將如下配置文檔添加到 classpath 下:
- core-site.xml
- hbase-site.xml
- hdfs-site.xml
對大數(shù)據(jù)感興趣的同學可以折騰一下這一點,如果之前沒有接觸過的,直接使用 MySQL 存儲就好了,只需要在初始化爬蟲程序時注入 MySQL 存儲器即可:
- // 3.注入存儲器
- iSpider.setStore(new MySQLStoreImpl());
URL 調度系統(tǒng)
URL 調度系統(tǒng)是實現(xiàn)整個爬蟲系統(tǒng)分布式的橋梁與關鍵,正是通過 URL 調度系統(tǒng)的使用,才使得整個爬蟲系統(tǒng)可以較為高效(Redis 作為存儲)隨機地獲取 URL,并實現(xiàn)整個系統(tǒng)的分布式。
URL 倉庫
通過架構圖可以看出,所謂的 URL 倉庫不過是 Redis 倉庫,即在我們的系統(tǒng)中使用 Redis 來保存 URL 地址列表。
正是這樣,才能保證我們的程序實現(xiàn)分布式,只要保存了 URL 是唯一的,這樣不管我們的爬蟲程序有多少個,最終保存下來的數(shù)據(jù)都是只有唯一一份的,而不會重復。
同時 URL 倉庫中的 URL 地址在獲取時的策略是通過隊列的方式來實現(xiàn)的,待會通過 URL 調度器的實現(xiàn)即可知道。
另外,在我們的 URL 倉庫中,主要保存了下面的數(shù)據(jù):
種子 URL 列表,Redis 的數(shù)據(jù)類型為 list
種子 URL 是持久化存儲的,一定時間后,由 URL 定時器通過種子 URL 獲取 URL,并將其注入到我們的爬蟲程序需要使用的高優(yōu)先級 URL 隊列中。
這樣就可以保證我們的爬蟲程序可以源源不斷地爬取數(shù)據(jù)而不需要中止程序的執(zhí)行。
高優(yōu)先級 URL 隊列,Redis 的數(shù)據(jù)類型為 set
什么是高優(yōu)先級 URL 隊列?其實它就是用來保存列表 URL 的。那么什么是列表 URL 呢?
說白了就是一個列表中含有多個商品,以京東為例,我們打開一個手機列表:
該地址中包含的不是一個具體商品的 URL,而是包含了多個我們需要爬取的數(shù)據(jù)(手機商品)的列表。
通過對每個高級 URL 的解析,我們可以獲取到非常多的具體商品 URL,而具體的商品 URL,就是低優(yōu)先 URL,其會保存到低優(yōu)先級 URL 隊列中。
那么以這個系統(tǒng)為例,保存的數(shù)據(jù)類似如下:
- jd.com.higher
- --https://list.jd.com/list.html?cat=9987,653,655&page=1
- ...
- suning.com.higher
- --https://list.suning.com/0-20006-0.html
- ...
低優(yōu)先級 URL 隊列,Redis 的數(shù)據(jù)類型為 set
低優(yōu)先級 URL 其實就是具體某個商品的 URL,如下面一個手機商品:
通過下載該 URL 的數(shù)據(jù),并對其進行解析,就能夠獲取到我們想要的數(shù)據(jù)。
那么以這個系統(tǒng)為例,保存的數(shù)據(jù)類似如下:
- jd.com.lower
- --https://item.jd.com/23545806622.html
- ...
- suning.com.lower
- --https://product.suning.com/0000000000/690128156.html
- ...
URL 調度器
所謂 URL 調度器,就是 URL 倉庫 Java 代碼的調度策略,不過因為其核心在于調度,所以將其放到 URL 調度器中來進行說明,目前其調度基于以下接口開發(fā):
- /**
- * url 倉庫
- * 主要功能:
- * 向倉庫中添加url(高優(yōu)先級的列表,低優(yōu)先級的商品url)
- * 從倉庫中獲取url(優(yōu)先獲取高優(yōu)先級的url,如果沒有,再獲取低優(yōu)先級的url)
- *
- */
- public interface IRepository {
- /**
- * 獲取url的方法
- * 從倉庫中獲取url(優(yōu)先獲取高優(yōu)先級的url,如果沒有,再獲取低優(yōu)先級的url)
- * @return
- */
- public String poll();
- /**
- * 向高優(yōu)先級列表中添加商品列表url
- * @param highUrl
- */
- public void offerHigher(String highUrl);
- /**
- * 向低優(yōu)先級列表中添加商品url
- * @param lowUrl
- */
- public void offerLower(String lowUrl);
- }
其基于 Redis 作為 URL 倉庫的實現(xiàn)如下:
- /**
- * 基于Redis的全網(wǎng)爬蟲,隨機獲取爬蟲url:
- *
- * Redis中用來保存url的數(shù)據(jù)結構如下:
- * 1.需要爬取的域名集合(存儲數(shù)據(jù)類型為set,這個需要先在Redis中添加)
- * key
- * spider.website.domains
- * value(set)
- * jd.com suning.com gome.com
- * key由常量對象SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY 獲得
- * 2.各個域名所對應的高低優(yōu)先url隊列(存儲數(shù)據(jù)類型為list,這個由爬蟲程序解析種子url后動態(tài)添加)
- * key
- * jd.com.higher
- * jd.com.lower
- * suning.com.higher
- * suning.com.lower
- * gome.com.higher
- * gome.come.lower
- * value(list)
- * 相對應需要解析的url列表
- * key由隨機的域名 + 常量 SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX或者SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX獲得
- * 3.種子url列表
- * key
- * spider.seed.urls
- * value(list)
- * 需要爬取的數(shù)據(jù)的種子url
- * key由常量SpiderConstants.SPIDER_SEED_URLS_KEY獲得
- *
- * 種子url列表中的url會由url調度器定時向高低優(yōu)先url隊列中
- */
- public class RandomRedisRepositoryImpl implements IRepository {
- /**
- * 構造方法
- */
- public RandomRedisRepositoryImpl() {
- init();
- }
- /**
- * 初始化方法,初始化時,先將redis中存在的高低優(yōu)先級url隊列全部刪除
- * 否則上一次url隊列中的url沒有消耗完時,再停止啟動跑下一次,就會導致url倉庫中有重復的url
- */
- public void init() {
- Jedis jedis = JedisUtil.getJedis();
- Set
domains = jedis.smembers(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY); - String higherUrlKey;
- String lowerUrlKey;
- for(String domain : domains) {
- higherUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX;
- lowerUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX;
- jedis.del(higherUrlKey, lowerUrlKey);
- }
- JedisUtil.returnJedis(jedis);
- }
- /**
- * 從隊列中獲取url,目前的策略是:
- * 1.先從高優(yōu)先級url隊列中獲取
- * 2.再從低優(yōu)先級url隊列中獲取
- * 對應我們的實際場景,應該是先解析完列表url再解析商品url
- * 但是需要注意的是,在分布式多線程的環(huán)境下,肯定是不能完全保證的,因為在某個時刻高優(yōu)先級url隊列中
- * 的url消耗完了,但實際上程序還在解析下一個高優(yōu)先級url,此時,其它線程去獲取高優(yōu)先級隊列url肯定獲取不到
- * 這時就會去獲取低優(yōu)先級隊列中的url,在實際考慮分析時,這點尤其需要注意
- * @return
- */
- @Override
- public String poll() {
- // 從set中隨機獲取一個***域名
- Jedis jedis = JedisUtil.getJedis();
- String randomDomain = jedis.srandmember(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY); // jd.com
- String key = randomDomain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX; // jd.com.higher
- String url = jedis.lpop(key);
- if(url == null) { // 如果為null,則從低優(yōu)先級中獲取
- key = randomDomain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX; // jd.com.lower
- url = jedis.lpop(key);
- }
- JedisUtil.returnJedis(jedis);
- return url;
- }
- /**
- * 向高優(yōu)先級url隊列中添加url
- * @param highUrl
- */
- @Override
- public void offerHigher(String highUrl) {
- offerUrl(highUrl, SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX);
- }
- /**
- * 向低優(yōu)先url隊列中添加url
- * @param lowUrl
- */
- @Override
- public void offerLower(String lowUrl) {
- offerUrl(lowUrl, SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX);
- }
- /**
- * 添加url的通用方法,通過offerHigher和offerLower抽象而來
- * @param url 需要添加的url
- * @param urlTypeSuffix url類型后綴.higher或.lower
- */
- public void offerUrl(String url, String urlTypeSuffix) {
- Jedis jedis = JedisUtil.getJedis();
- String domain = SpiderUtil.getTopDomain(url); // 獲取url對應的***域名,如jd.com
- String key = domain + urlTypeSuffix; // 拼接url隊列的key,如jd.com.higher
- jedis.lpush(key, url); // 向url隊列中添加url
- JedisUtil.returnJedis(jedis);
- }
- }
通過代碼分析也可以知道,其核心就在如何調度 URL 倉庫(Redis)中的 URL。
URL 定時器
一段時間后,高優(yōu)先級 URL 隊列和低優(yōu)先 URL 隊列中的 URL 都會被消費完。
為了讓程序可以繼續(xù)爬取數(shù)據(jù),同時減少人為的干預,可以預先在 Redis 中插入種子 URL,之后定時讓 URL 定時器從種子 URL 中取出 URL 存放到高優(yōu)先級 URL 隊列中,以此達到程序定時不間斷爬取數(shù)據(jù)的目的。
URL 消費完畢后,是否需要循環(huán)不斷爬取數(shù)據(jù)根據(jù)個人業(yè)務需求而不同,因此這一步不是必需的,只是也提供了這樣的操作。
因為事實上,我們需要爬取的數(shù)據(jù)也是每隔一段時間就會更新的,如果希望我們爬取的數(shù)據(jù)也跟著定時更新,那么這時定時器就有非常重要的作用了。
不過需要注意的是,一旦決定需要循環(huán)重復爬取數(shù)據(jù),則在設計存儲器實現(xiàn)時需要考慮重復數(shù)據(jù)的問題,即重復數(shù)據(jù)應該是更新操作。
目前在我設計的存儲器不包括這個功能,有興趣的朋友可以自己實現(xiàn),只需要在插入數(shù)據(jù)前判斷數(shù)據(jù)庫中是否存在該數(shù)據(jù)即可。
另外需要注意的一點是,URL 定時器是一個獨立的進程,需要單獨啟動。
定時器基于 Quartz 實現(xiàn),下面是其 job 的代碼:
- /**
- * 每天定時從url倉庫中獲取種子url,添加進高優(yōu)先級列表
- */
- public class UrlJob implements Job {
- // log4j日志記錄
- private Logger logger = LoggerFactory.getLogger(UrlJob.class);
- @Override
- public void execute(JobExecutionContext context) throws JobExecutionException {
- /**
- * 1.從指定url種子倉庫獲取種子url
- * 2.將種子url添加進高優(yōu)先級列表
- */
- Jedis jedis = JedisUtil.getJedis();
- Set
seedUrls = jedis.smembers(SpiderConstants.SPIDER_SEED_URLS_KEY); // spider.seed.urls Redis數(shù)據(jù)類型為set,防止重復添加種子url - for(String seedUrl : seedUrls) {
- String domain = SpiderUtil.getTopDomain(seedUrl); // 種子url的***域名
- jedis.sadd(domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX, seedUrl);
- logger.info("獲取種子:{}", seedUrl);
- }
- JedisUtil.returnJedis(jedis);
- // System.out.println("Scheduler Job Test...");
- }
- }
調度器的實現(xiàn)如下:
- /**
- * url定時調度器,定時向url對應倉庫中存放種子url
- *
- * 業(yè)務規(guī)定:每天凌晨1點10分向倉庫中存放種子url
- */
- public class UrlJobScheduler {
- public UrlJobScheduler() {
- init();
- }
- /**
- * 初始化調度器
- */
- public void init() {
- try {
- Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
- // 如果沒有以下start方法的執(zhí)行,則是不會開啟任務的調度
當前名稱:手把手教你搭建一個基于Java的分布式爬蟲系統(tǒng)
鏈接URL:http://uogjgqi.cn/article/dpgpddd.html

我們在微信上24小時期待你的聲音
解答本文疑問/技術咨詢/運營咨詢/技術建議/互聯(lián)網(wǎng)交流