Python笔记 - Scrapy爬虫(二) Pipeline Mysql

/ 技术文章 / 0 条评论 / 620浏览

Python笔记 - Scrapy爬虫(二) Pipeline Mysql

上次的爬虫,只把数据寄到文件当中,这次将试着将数据存到Mysql的数据库当中。其中,将会用到数据库链接和连接池 参考: https://docs.scrapy.org/en/latest/topics/item-pipeline.html

创建数据库

mysql -uroot -p

mysql> create database spider DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
mysql> GRANT ALL PRIVILEGES ON spider.* TO 'spider'@'%' IDENTIFIED BY 'Spider@2018';

CREATE TABLE `spider`.`minimp4` (
  `id` VARCHAR(45) NOT NULL COMMENT 'UUID',
  `name` VARCHAR(128) NOT NULL COMMENT '电影名称',
  `region` VARCHAR(45) NULL COMMENT '制片地区',
  `language` VARCHAR(45) NULL COMMENT '语言',
  `release_time` VARCHAR(45) NULL COMMENT '上映时间',
  `duaration` VARCHAR(45) NULL COMMENT '片长',
  `douban_point` VARCHAR(45) NULL COMMENT '豆瓣评分',
  `imdb_point` VARCHAR(45) NULL COMMENT 'IMDB评分',
  `details` TEXT NULL COMMENT '所有抓取内容JSON',
  PRIMARY KEY (`id`),
  UNIQUE INDEX `name_UNIQUE` (`name` ASC))
COMMENT = 'Spider contents for minimp4';

依赖库

Python 3.5+

python3 -m pip install --upgrade pymysql
import pymysql.cursors

Python 3.4-

python3 -m pip install --upgrade PyMySQL
import MySQLdb

实现Pipeline

先在stting.py面设置数据库配置,等会可以直接调用

MYSQL_HOST = 'localhost'
MYSQL_DBNAME = 'spider'
MYSQL_USER = 'spider'
MYSQL_PASSWD = 'Spider@2018'
MYSQL_PORT = 3306

简单写法

覆写from_crawler(cls, crawler) 以及 __init__(self, settings)方法获取数据库配置

open_spider中打开数据库连接,close_spider中关闭连接

process_item中拼写数据库执行语句,表中插入数据

pipelines.py

import json
import pymysql

class MiniMp4MysqlPipeline(object):
    insert_sql = '''insert into minimp4(
        id, 
        name, 
        region, 
        language, 
        release_time, 
        duaration, 
        douban_point, 
        imdb_point, 
        details
        ) values (
           uuid(),
            '{name}',
            '{region}',
            '{language}',
            '{release_time}',
            '{duaration}',
            '{douban_point}',
            '{imdb_point}',
            '{details}'
        );'''

    # 初始化时获取 setting.py 配置
    def __init__(self, settings):
        self.settings = settings

    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler.settings)

    # 打开spider时,初始化熟虑库连接
    # 定义from_crawler方法后,scrapy将不会调用 __init__(self, *args, **kwargs)
    def open_spider(self, *args, **kwargs):
       # 连接数据库
       self.connect = pymysql.connect(
           host=self.settings.get('MYSQL_HOST'),#数据库地址
           port=self.settings.get('MYSQL_PORT'),# 数据库端口
           db=self.settings.get('MYSQL_DBNAME'), # 数据库名
           user = self.settings.get('MYSQL_USER'), # 数据库用户名
           passwd=self.settings.get('MYSQL_PASSWD'), # 数据库密码
           charset='utf8', # 编码方式
           use_unicode=True)
       # 通过cursor执行增删查改
       self.cursor = self.connect.cursor();
       self.connect.autocommit(True)

    def process_item(self, item, spider):
        try:
            sqltext = self.insert_sql.format(
                name=pymysql.escape_string(item['name'][0]),
                region=pymysql.escape_string(item['region'][0]),
                language=pymysql.escape_string(item['language'][0]),
                release_time=pymysql.escape_string(item['release_time'][0]),
                duaration=pymysql.escape_string(item['duaration'][0]),
                douban_point=pymysql.escape_string(item['douban_point'][0]),
                imdb_point=pymysql.escape_string(item['imdb_point'][0]),
                details=pymysql.escape_string(json.dumps(dict(item), ensure_ascii=False)))
            self.cursor.execute(sqltext)
        except pymysql.err.IntegrityError as e:
            print('ERROR - Insert data failed: [{0}]'.format(repr(e)))
        except:
             print('Unknown error')
        else:
            print('Insert data successfully!')
        return item

    # Override close_spider, 
    # 覆写方法,千万不要改名字
    def close_spider(self, spider):
        self.cursor.close()
        self.connect.close()

settings.py

ITEM_PIPELINES = {
   'tb_spider.pipelines.MiniMp4MysqlPipeline': 200,
  #  'tb_spider.pipelines.MiniMp4SpiderPipeline': 300,
}

连接池写法

相比直接插入db,需要多导入一个库 from twisted.enterprise import adbapi

from_settings(cls, settings) 从settings.py获取设置,初始化连接池设置,并将连接池托管给类

__init__(self, dbpool) 获取连接池

process_item覆写item处理方法, 使用twisted将mysql插入变成异步执行

do_insert 覆写sql执行方法

handle_error 覆写异常处理方法

get_insert_sql拼写插入语句,如果把该方法放到item配置里,pipeline就能变成通用pipeline

import json
import pymysql
from twisted.enterprise import adbapi

class MiniMp4MysqlPoolPipeline(object):

    # 初始化时获取 setting.py 配置
    def __init__(self, dbpool):
        self.dbpool = dbpool

    @classmethod
    def from_settings(cls, settings):
        dbparams = dict(
            host=settings.get('MYSQL_HOST'),#数据库地址
            port=settings.get('MYSQL_PORT'),# 数据库端口
            db=settings.get('MYSQL_DBNAME'), # 数据库名
            user=settings.get('MYSQL_USER'), # 数据库用户名
            passwd=settings.get('MYSQL_PASSWD'), # 数据库密码
            charset='utf8', # 编码方式
            cursorclass=pymysql.cursors.DictCursor, # 指定 curosr 类型
            use_unicode=True
        )
        dbpool = adbapi.ConnectionPool('pymysql',**dbparams)
        return cls(dbpool) # 相当于dbpool付给了这个类,self中可以得到

    # 使用twisted将mysql插入变成异步执行
    def process_item(self, item, spider):
        # 指定操作方法和操作的数据
        query = self.dbpool.runInteraction(self.do_insert, item)
        # 指定异常处理方法
        query.addErrback(self.handle_error, item, spider) 

    def handle_error(self, failure, item, spider):
        #处理异步插入的异常
        print (failure)
        print("ERROR - Following item cannot be inserted into db:")
        print(repr(item))

    def do_insert(self, cursor, item):
        #执行具体的插入
        #根据不同的item 构建不同的sql语句并插入到mysql中
        # 如果把该方法放到item配置里,pipeline就能变成通用pipeline
        # insert_sql,params=item.get_insert_sql() 
        insert_sql,params=self.get_insert_sql(item)
        cursor.execute(insert_sql,params)

    def get_insert_sql(self, item):
        insert_sql =  '''insert into minimp4(
        id, 
        name, 
        region, 
        language, 
        release_time, 
        duaration, 
        douban_point, 
        imdb_point, 
        details
        ) values (
           uuid(), %s, %s, %s, %s, %s, %s, %s, %s
        );'''	
        params = (
            pymysql.escape_string( item['name'][0] if len(item['name'])>0 else ''),
            pymysql.escape_string(item['region'][0] if len(item['region'])>0 else ''),
            pymysql.escape_string(item['language'][0] if len(item['language'])>0 else ''),
            pymysql.escape_string(item['release_time'][0] if len(item['release_time'])>0 else ''),
            pymysql.escape_string(item['duaration'][0] if len(item['duaration'])>0 else ''),
            pymysql.escape_string(item['douban_point'][0] if len(item['douban_point'])>0 else ''),
            pymysql.escape_string(item['imdb_point'][0] if len(item['imdb_point'])>0 else ''),
            pymysql.escape_string(json.dumps(dict(item), ensure_ascii=False))
        )
        return (insert_sql,params)