# Copyright (c) 2017–2018 crocoite contributors
# 
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# 
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# 
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

"""
Controller classes, handling actions required for archival
"""

class ControllerSettings:
    __slots__ = ('idleTimeout', 'timeout')

    def __init__ (self, idleTimeout=2, timeout=10):
        self.idleTimeout = idleTimeout
        self.timeout = timeout

    def toDict (self):
        return dict (idleTimeout=self.idleTimeout, timeout=self.timeout)

defaultSettings = ControllerSettings ()

class EventHandler:
    """ Abstract base class for event handler """

    __slots__ = ()

    # this handler wants to know about exceptions before they are reraised by
    # the controller
    acceptException = False

    def push (self, item):
        raise NotImplementedError ()

class StatsHandler (EventHandler):
    __slots__ = ('stats')

    acceptException = True

    def __init__ (self):
        self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0}

    def push (self, item):
        if isinstance (item, Item):
            self.stats['requests'] += 1
            if item.failed:
                self.stats['failed'] += 1
            else:
                self.stats['finished'] += 1
                self.stats['bytesRcv'] += item.encodedDataLength

from .behavior import ExtractLinksEvent
from itertools import islice

class LogHandler (EventHandler):
    """ Handle items by logging information about them """

    __slots__ = ('logger')

    def __init__ (self, logger):
        self.logger = logger.bind (context=type (self).__name__)

    def push (self, item):
        if isinstance (item, ExtractLinksEvent):
            # limit number of links per message, so json blob won’t get too big
            it = iter (item.links)
            limit = 100
            while True:
                limitlinks = list (islice (it, 0, limit))
                if not limitlinks:
                    break
                self.logger.info ('extracted links', context=type (item).__name__,
                        uuid='8ee5e9c9-1130-4c5c-88ff-718508546e0c', links=limitlinks)

import time, platform

from . import behavior as cbehavior
from .browser import ChromeService, SiteLoader, Item
from .util import getFormattedViewportMetrics, getRequirements

class ControllerStart:
    __slots__ = ('payload')

    def __init__ (self, payload):
        self.payload = payload

class SinglePageController:
    """
    Archive a single page url to file output.

    Dispatches between producer (site loader and behavior scripts) and consumer
    (stats, warc writer).
    """

    __slots__ = ('url', 'output', 'service', 'behavior', 'settings', 'logger', 'handler')

    def __init__ (self, url, output, logger, \
            service=ChromeService (), behavior=cbehavior.available, \
            settings=defaultSettings, handler=[]):
        self.url = url
        self.output = output
        self.service = service
        self.behavior = behavior
        self.settings = settings
        self.logger = logger.bind (context=type (self).__name__, url=url)
        self.handler = handler

    def processItem (self, item):
        for h in self.handler:
            h.push (item)

    async def run (self):
        logger = self.logger
        async def processQueue ():
            async for item in l:
                self.processItem (item)

        with self.service as browser:
            async with SiteLoader (browser, self.url, logger=logger) as l:
                handle = asyncio.ensure_future (processQueue ())

                start = time.time ()

                version = await l.tab.Browser.getVersion ()
                payload = {
                        'software': {
                            'platform': platform.platform (),
                            'python': {
                                'implementation': platform.python_implementation(),
                                'version': platform.python_version (),
                                'build': platform.python_build ()
                                },
                            'self': getRequirements (__package__)
                            },
                        'browser': {
                            'product': version['product'],
                            'useragent': version['userAgent'],
                            'viewport': await getFormattedViewportMetrics (l.tab),
                            },
                        }
                self.processItem (ControllerStart (payload))

                # not all behavior scripts are allowed for every URL, filter them
                enabledBehavior = list (filter (lambda x: self.url in x,
                        map (lambda x: x (l, logger), self.behavior)))

                for b in enabledBehavior:
                    async for item in b.onload ():
                        self.processItem (item)
                await l.start ()

                # XXX: this does not detect idle changes properly
                idleSince = None
                while True:
                    now = time.time()
                    runtime = now-start
                    if runtime >= self.settings.timeout or (idleSince and now-idleSince > self.settings.idleTimeout):
                        break
                    if len (l) == 0:
                        if idleSince is None:
                            idleSince = time.time ()
                    else:
                        idleSince = None
                    await asyncio.sleep (1)
                await l.tab.Page.stopLoading ()

                for b in enabledBehavior:
                    async for item in b.onstop ():
                        self.processItem (item)

                await asyncio.sleep (1)

                for b in enabledBehavior:
                    async for item in b.onfinish ():
                        self.processItem (item)

                # drain the queue XXX detect idle properly
                i = 0
                while len (l) and i < 20:
                    i += 1
                    await asyncio.sleep (1)

                if handle.done ():
                    handle.result ()
                else:
                    handle.cancel ()

class RecursionPolicy:
    """ Abstract recursion policy """

    __slots__ = ()

    def __call__ (self, urls):
        raise NotImplementedError

class DepthLimit (RecursionPolicy):
    """
    Limit recursion by depth.
    
    depth==0 means no recursion, depth==1 is the page and outgoing links
    """

    __slots__ = ('maxdepth')

    def __init__ (self, maxdepth=0):
        if maxdepth < 0 or maxdepth > 1:
            raise ValueError ('Unsupported')
        self.maxdepth = maxdepth

    def __call__ (self, urls):
        if self.maxdepth <= 0:
            return {}
        else:
            self.maxdepth -= 1
            return urls

    def __repr__ (self):
        return '<DepthLimit {}>'.format (self.maxdepth)

class PrefixLimit (RecursionPolicy):
    """
    Limit recursion by prefix
    
    i.e. prefix=http://example.com/foo
    ignored: http://example.com/bar http://offsite.example/foo
    accepted: http://example.com/foobar http://example.com/foo/bar
    """

    __slots__ = ('prefix')

    def __init__ (self, prefix):
        self.prefix = prefix

    def __call__ (self, urls):
        return set (filter (lambda u: u.startswith (self.prefix), urls))

import tempfile, asyncio, json, os
from datetime import datetime
from urllib.parse import urlparse
from .behavior import ExtractLinksEvent
from .util import removeFragment

class RecursiveController:
    """
    Simple recursive controller

    Visits links acording to policy
    """

    __slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have',
            'pending', 'stats', 'prefix', 'tempdir', 'running', 'concurrency', '_quit')

    SCHEME_WHITELIST = {'http', 'https'}

    def __init__ (self, url, output, command, logger, prefix='{host}-{date}-',
            tempdir=None, policy=DepthLimit (0), concurrency=1):
        self.url = url
        self.output = output
        self.command = command
        self.prefix = prefix
        self.logger = logger.bind (context=type(self).__name__, seedurl=url)
        self.policy = policy
        self.tempdir = tempdir
        # tasks currently running
        self.running = set ()
        # max number of tasks running
        self.concurrency = concurrency
        # keep in sync with StatsHandler
        self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0, 'ignored': 0}
        # initiate graceful shutdown
        self._quit = False

    async def fetch (self, url):
        """
        Fetch a single URL using an external command

        command is usually crocoite-grab
        """

        logger = self.logger.bind (url=url)

        def formatCommand (e):
            return e.format (url=url, dest=dest.name)

        def formatPrefix (p):
            return p.format (host=urlparse (url).hostname, date=datetime.utcnow ().isoformat ())

        def logStats ():
            logger.info ('stats', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **self.stats)

        if urlparse (url).scheme not in self.SCHEME_WHITELIST:
            self.stats['ignored'] += 1
            logStats ()
            self.logger.warning ('scheme not whitelisted', url=url,
                    uuid='57e838de-4494-4316-ae98-cd3a2ebf541b')
            return

        dest = tempfile.NamedTemporaryFile (dir=self.tempdir,
                prefix=formatPrefix (self.prefix), suffix='.warc.gz',
                delete=False)
        destpath = os.path.join (self.output, os.path.basename (dest.name))
        command = list (map (formatCommand, self.command))
        logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command, destfile=destpath)
        process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL,
                start_new_session=True)
        while True:
            data = await process.stdout.readline ()
            if not data:
                break
            data = json.loads (data)
            uuid = data.get ('uuid')
            if uuid == '8ee5e9c9-1130-4c5c-88ff-718508546e0c':
                links = set (self.policy (map (removeFragment, data.get ('links', []))))
                links.difference_update (self.have)
                self.pending.update (links)
            elif uuid == '24d92d16-770e-4088-b769-4020e127a7ff':
                for k in self.stats.keys ():
                    self.stats[k] += data.get (k, 0)
                logStats ()
        code = await process.wait()
        if code == 0:
            # atomically move once finished
            os.rename (dest.name, destpath)
        else:
            self.stats['crashed'] += 1
            logStats ()

    def cancel (self):
        """ Gracefully cancel this job, waiting for existing workers to shut down """
        self.logger.info ('cancel',
                uuid='d58154c8-ec27-40f2-ab9e-e25c1b21cd88',
                pending=len (self.pending), have=len (self.have),
                running=len (self.running))
        self._quit = True

    async def run (self):
        def log ():
            self.logger.info ('recursing',
                    uuid='5b8498e4-868d-413c-a67e-004516b8452c',
                    pending=len (self.pending), have=len (self.have),
                    running=len (self.running))

        self.have = set ()
        self.pending = set ([self.url])

        while self.pending and not self._quit:
            # since pending is a set this picks a random item, which is fine
            u = self.pending.pop ()
            self.have.add (u)
            t = asyncio.ensure_future (self.fetch (u))
            self.running.add (t)

            log ()

            if len (self.running) >= self.concurrency or not self.pending:
                done, pending = await asyncio.wait (self.running,
                        return_when=asyncio.FIRST_COMPLETED)
                self.running.difference_update (done)

        done = asyncio.gather (*self.running)
        self.running = set ()
        log ()