星際爭霸2 AI開發(持續更新)

準備

我的環境是python3.6,sc2包0.11.1
機器學習包下載鏈接:
地圖下載鏈接
pysc2是DeepMind開發的星際爭霸Ⅱ學習環境。 它是封裝星際爭霸Ⅱ機器學習API,同時也提供Python增強學習環境。
以神族為例編寫代碼,神族建築科技圖如下:

採礦

# -*- encoding: utf-8 -*-
'''
@File    :   __init__.py.py    
@Modify Time      @Author       @Desciption
------------      -------       -----------
2019/11/3 12:32   Jonas           None
'''

import sc2
from sc2 import run_game, maps, Race, Difficulty
from sc2.player import Bot, Computer


class SentdeBot(sc2.BotAI):
    async def on_step(self, iteration: int):
        await self.distribute_workers()


run_game(maps.get("AcidPlantLE"), [
    Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Easy)
],realtime = True)

注意
game_data.py的assert self.id != 0註釋掉
pixel_map.py的assert self.bits_per_pixel % 8 == 0, "Unsupported pixel density"註釋掉
否則會報錯

運行結果如下,农民開始採礦

可以正常採礦

建造农民和水晶塔

import sc2
from sc2 import run_game, maps, Race, Difficulty
from sc2.player import Bot, Computer
from sc2.constants import *


class SentdeBot(sc2.BotAI):
    async def on_step(self, iteration: int):
        await self.distribute_workers()
        await self.build_workers()
        await self.build_pylons()

    # 建造农民
    async def build_workers(self):
        # 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於佔用資源
        for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
            # 是否有50晶體礦
            if self.can_afford(UnitTypeId.PROBE):
                await self.do(nexus.train(UnitTypeId.PROBE))

    ## 建造水晶
    async def build_pylons(self):
        ## 供應人口和現有人口之差小於5且水晶不是正在建造
        if self.supply_left<5 and not self.already_pending(UnitTypeId.PYLON):
            nexuses = self.units(UnitTypeId.NEXUS).ready
            if nexuses.exists:
                if self.can_afford(UnitTypeId.PYLON):
                    await self.build(UnitTypeId.PYLON,near=nexuses.first)

## 啟動遊戲
run_game(maps.get("AcidPlantLE"), [
    Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Easy)
],realtime = True)

運行結果如下,基地造农民,农民造水晶

收集氣體和開礦

代碼如下

import sc2
from sc2 import run_game, maps, Race, Difficulty
from sc2.player import Bot, Computer
from sc2.constants import *


class SentdeBot(sc2.BotAI):
    async def on_step(self, iteration: int):
        await self.distribute_workers()
        await self.build_workers()
        await self.build_pylons()
        await self.build_assimilators()
        await self.expand()

    # 建造农民
    async def build_workers(self):
        # 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於佔用資源
        for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
            # 是否有50晶體礦
            if self.can_afford(UnitTypeId.PROBE):
                await self.do(nexus.train(UnitTypeId.PROBE))

    ## 建造水晶
    async def build_pylons(self):
        ## 供應人口和現有人口之差小於5且建築不是正在建造
        if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
            nexuses = self.units(UnitTypeId.NEXUS).ready
            if nexuses.exists:
                if self.can_afford(UnitTypeId.PYLON):
                    await self.build(UnitTypeId.PYLON, near=nexuses.first)

    ## 建造吸收廠
    async def build_assimilators(self):
        for nexus in self.units(UnitTypeId.NEXUS).ready:
            # 在瓦斯泉上建造吸收廠
            vaspenes = self.state.vespene_geyser.closer_than(15.0,nexus)
            for vaspene in vaspenes:
                if not self.can_afford(UnitTypeId.ASSIMILATOR):
                    break
                worker = self.select_build_worker(vaspene.position)
                if worker is None:
                    break
                if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0,vaspene).exists:
                    await self.do(worker.build(UnitTypeId.ASSIMILATOR,vaspene))

    ## 開礦
    async def expand(self):
        if self.units(UnitTypeId.NEXUS).amount<3 and self.can_afford(UnitTypeId.NEXUS):
            await self.expand_now()

## 啟動遊戲
run_game(maps.get("AcidPlantLE"), [
    Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Easy)
], realtime=False)

run_game的realtime設置成False,可以在加速模式下運行遊戲。
運行效果如下:

可以建造吸收廠和開礦

建造軍隊

import sc2
from sc2 import run_game, maps, Race, Difficulty
from sc2.player import Bot, Computer
from sc2.constants import *


class SentdeBot(sc2.BotAI):
    async def on_step(self, iteration: int):
        await self.distribute_workers()
        await self.build_workers()
        await self.build_pylons()
        await self.build_assimilators()
        await self.expand()
        await self.offensive_force_buildings()
        await self.build_offensive_force()

    # 建造农民
    async def build_workers(self):
        # 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於佔用資源
        for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
            # 是否有50晶體礦
            if self.can_afford(UnitTypeId.PROBE):
                await self.do(nexus.train(UnitTypeId.PROBE))

    ## 建造水晶
    async def build_pylons(self):
        ## 供應人口和現有人口之差小於5且建築不是正在建造
        if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
            nexuses = self.units(UnitTypeId.NEXUS).ready
            if nexuses.exists:
                if self.can_afford(UnitTypeId.PYLON):
                    await self.build(UnitTypeId.PYLON, near=nexuses.first)

    ## 建造吸收廠
    async def build_assimilators(self):
        for nexus in self.units(UnitTypeId.NEXUS).ready:
            # 在瓦斯泉上建造吸收廠
            vaspenes = self.state.vespene_geyser.closer_than(15.0,nexus)
            for vaspene in vaspenes:
                if not self.can_afford(UnitTypeId.ASSIMILATOR):
                    break
                worker = self.select_build_worker(vaspene.position)
                if worker is None:
                    break
                if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0,vaspene).exists:
                    await self.do(worker.build(UnitTypeId.ASSIMILATOR,vaspene))

    ## 開礦
    async def expand(self):
        if self.units(UnitTypeId.NEXUS).amount<2 and self.can_afford(UnitTypeId.NEXUS):
            await self.expand_now()

    ## 建造進攻性建築
    async def offensive_force_buildings(self):
        if self.units(UnitTypeId.PYLON).ready.exists:
            pylon = self.units(UnitTypeId.PYLON).ready.random
            if self.units(UnitTypeId.PYLON).ready.exists:
                # 根據神族建築科技圖,折躍門建造過後才可以建造控制核心
                if self.units(UnitTypeId.GATEWAY).ready.exists:
                    if not self.units(UnitTypeId.CYBERNETICSCORE):
                        if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
                            await self.build(UnitTypeId.CYBERNETICSCORE,near = pylon)
                # 否則建造折躍門
                else:
                    if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
                        await self.build(UnitTypeId.GATEWAY,near=pylon)

    # 造兵
    async def build_offensive_force(self):
        # 無隊列化建造
        for gw in self.units(UnitTypeId.GATEWAY).ready.noqueue:
            if self.can_afford(UnitTypeId.STALKER) and self.supply_left>0:
                await self.do(gw.train(UnitTypeId.STALKER))



## 啟動遊戲
run_game(maps.get("AcidPlantLE"), [
    Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Easy)
], realtime=False)

運行結果如下:

可以看到,我們建造了折躍門和控制核心並訓練了追獵者

控制部隊進攻

代碼如下


import sc2
from sc2 import run_game, maps, Race, Difficulty
from sc2.player import Bot, Computer
from sc2.constants import *
import random

class SentdeBot(sc2.BotAI):
    async def on_step(self, iteration: int):
        await self.distribute_workers()
        await self.build_workers()
        await self.build_pylons()
        await self.build_assimilators()
        await self.expand()
        await self.offensive_force_buildings()
        await self.build_offensive_force()
        await self.attack()

    # 建造农民
    async def build_workers(self):
        # 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於佔用資源
        for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
            # 是否有50晶體礦
            if self.can_afford(UnitTypeId.PROBE):
                await self.do(nexus.train(UnitTypeId.PROBE))

    ## 建造水晶
    async def build_pylons(self):
        ## 供應人口和現有人口之差小於5且建築不是正在建造
        if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
            nexuses = self.units(UnitTypeId.NEXUS).ready
            if nexuses.exists:
                if self.can_afford(UnitTypeId.PYLON):
                    await self.build(UnitTypeId.PYLON, near=nexuses.first)

    ## 建造吸收廠
    async def build_assimilators(self):
        for nexus in self.units(UnitTypeId.NEXUS).ready:
            # 在瓦斯泉上建造吸收廠
            vaspenes = self.state.vespene_geyser.closer_than(15.0,nexus)
            for vaspene in vaspenes:
                if not self.can_afford(UnitTypeId.ASSIMILATOR):
                    break
                worker = self.select_build_worker(vaspene.position)
                if worker is None:
                    break
                if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0,vaspene).exists:
                    await self.do(worker.build(UnitTypeId.ASSIMILATOR,vaspene))

    ## 開礦
    async def expand(self):
        if self.units(UnitTypeId.NEXUS).amount<3 and self.can_afford(UnitTypeId.NEXUS):
            await self.expand_now()

    ## 建造進攻性建築
    async def offensive_force_buildings(self):
        if self.units(UnitTypeId.PYLON).ready.exists:
            pylon = self.units(UnitTypeId.PYLON).ready.random
            # 根據神族建築科技圖,折躍門建造過後才可以建造控制核心
            if self.units(UnitTypeId.GATEWAY).ready.exists and not self.units(UnitTypeId.CYBERNETICSCORE):
                if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
                    await self.build(UnitTypeId.CYBERNETICSCORE,near = pylon)
            # 否則建造折躍門
            elif len(self.units(UnitTypeId.GATEWAY))<=3:
                if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
                    await self.build(UnitTypeId.GATEWAY,near=pylon)

    ## 造兵
    async def build_offensive_force(self):
        # 無隊列化建造
        for gw in self.units(UnitTypeId.GATEWAY).ready.noqueue:
            if self.can_afford(UnitTypeId.STALKER) and self.supply_left>0:
                await self.do(gw.train(UnitTypeId.STALKER))

    ## 尋找目標
    def find_target(self,state):
        if len(self.known_enemy_units)>0:
            # 隨機選取敵方單位
            return random.choice(self.known_enemy_units)
        elif len(self.known_enemy_units)>0:
            # 隨機選取敵方建築
            return random.choice(self.known_enemy_structures)
        else:
            # 返回敵方出生點位
            return self.enemy_start_locations[0]

    ## 進攻
    async def attack(self):
        # 追獵者數量超過15個開始進攻
        if self.units(UnitTypeId.STALKER).amount>15:
            for s in self.units(UnitTypeId.STALKER).idle:
                await self.do(s.attack(self.find_target(self.state)))

        # 防衛模式:視野範圍內存在敵人,開始攻擊
        if self.units(UnitTypeId.STALKER).amount>5:
            if len(self.known_enemy_units)>0:
                for s in self.units(UnitTypeId.STALKER).idle:
                    await self.do(s.attack(random.choice(self.known_enemy_units)))

## 啟動遊戲
run_game(maps.get("AcidPlantLE"), [
    Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Medium)
], realtime=False)

運行結果如下

可以看到,4個折躍門訓練追獵者並發動進攻。

擊敗困難電腦

我們目前的代碼只能擊敗中等和簡單電腦,那麼如何擊敗困難電腦呢?
代碼如下


import sc2
from sc2 import run_game, maps, Race, Difficulty
from sc2.player import Bot, Computer
from sc2.constants import *
import random


class SentdeBot(sc2.BotAI):
    def __init__(self):
        # 經過計算,每分鐘大約165迭代次數
        self.ITERATIONS_PER_MINUTE = 165
        # 最大农民數量
        self.MAX_WORKERS = 65

    async def on_step(self, iteration: int):
        self.iteration = iteration
        await self.distribute_workers()
        await self.build_workers()
        await self.build_pylons()
        await self.build_assimilators()
        await self.expand()
        await self.offensive_force_buildings()
        await self.build_offensive_force()
        await self.attack()

    # 建造农民
    async def build_workers(self):
        # 星靈樞鈕*16(一個基地配備16個农民)大於農民數量並且現有农民數量小於MAX_WORKERS
        if len(self.units(UnitTypeId.NEXUS))*16>len(self.units(UnitTypeId.PROBE)) and len(self.units(UnitTypeId.PROBE))<self.MAX_WORKERS:
                # 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於佔用資源
                for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
                    # 是否有50晶體礦建造农民
                    if self.can_afford(UnitTypeId.PROBE):
                        await self.do(nexus.train(UnitTypeId.PROBE))

    ## 建造水晶
    async def build_pylons(self):
        ## 供應人口和現有人口之差小於5且建築不是正在建造
        if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
            nexuses = self.units(UnitTypeId.NEXUS).ready
            if nexuses.exists:
                if self.can_afford(UnitTypeId.PYLON):
                    await self.build(UnitTypeId.PYLON, near=nexuses.first)

    ## 建造吸收廠
    async def build_assimilators(self):
        for nexus in self.units(UnitTypeId.NEXUS).ready:
            # 在瓦斯泉上建造吸收廠
            vaspenes = self.state.vespene_geyser.closer_than(15.0,nexus)
            for vaspene in vaspenes:
                if not self.can_afford(UnitTypeId.ASSIMILATOR):
                    break
                worker = self.select_build_worker(vaspene.position)
                if worker is None:
                    break
                if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0,vaspene).exists:
                    await self.do(worker.build(UnitTypeId.ASSIMILATOR,vaspene))

    ## 開礦
    async def expand(self):
        # (self.iteration / self.ITERATIONS_PER_MINUTE)是一個緩慢遞增的值,動態開礦
        if self.units(UnitTypeId.NEXUS).amount<self.iteration / self.ITERATIONS_PER_MINUTE and self.can_afford(UnitTypeId.NEXUS):
            await self.expand_now()

    ## 建造進攻性建築
    async def offensive_force_buildings(self):
        print(self.iteration / self.ITERATIONS_PER_MINUTE)
        if self.units(UnitTypeId.PYLON).ready.exists:
            pylon = self.units(UnitTypeId.PYLON).ready.random
            # 根據神族建築科技圖,折躍門建造過後才可以建造控制核心
            if self.units(UnitTypeId.GATEWAY).ready.exists and not self.units(UnitTypeId.CYBERNETICSCORE):
                if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
                    await self.build(UnitTypeId.CYBERNETICSCORE, near=pylon)
            # 否則建造折躍門
            # (self.iteration / self.ITERATIONS_PER_MINUTE)/2 是一個緩慢遞增的值
            elif len(self.units(UnitTypeId.GATEWAY)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
                if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
                    await self.build(UnitTypeId.GATEWAY, near=pylon)
            # 控制核心存在的情況下建造星門
            if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
                if len(self.units(UnitTypeId.STARGATE)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
                    if self.can_afford(UnitTypeId.STARGATE) and not self.already_pending(UnitTypeId.STARGATE):
                        await self.build(UnitTypeId.STARGATE, near=pylon)

    ## 造兵
    async def build_offensive_force(self):
        # 無隊列化建造
        for gw in self.units(UnitTypeId.GATEWAY).ready.noqueue:
            if not self.units(UnitTypeId.STALKER).amount > self.units(UnitTypeId.VOIDRAY).amount:

                if self.can_afford(UnitTypeId.STALKER) and self.supply_left > 0:
                    await self.do(gw.train(UnitTypeId.STALKER))

        for sg in self.units(UnitTypeId.STARGATE).ready.noqueue:
            if self.can_afford(UnitTypeId.VOIDRAY) and self.supply_left > 0:
                await self.do(sg.train(UnitTypeId.VOIDRAY))

    ## 尋找目標
    def find_target(self,state):
        if len(self.known_enemy_units)>0:
            # 隨機選取敵方單位
            return random.choice(self.known_enemy_units)
        elif len(self.known_enemy_units)>0:
            # 隨機選取敵方建築
            return random.choice(self.known_enemy_structures)
        else:
            # 返回敵方出生點位
            return self.enemy_start_locations[0]

    ## 進攻
    async def attack(self):
        # {UNIT: [n to fight, n to defend]}
        aggressive_units = {UnitTypeId.STALKER: [15, 5],
                            UnitTypeId.VOIDRAY: [8, 3]}

        for UNIT in aggressive_units:
            # 攻擊模式
            if self.units(UNIT).amount > aggressive_units[UNIT][0] and self.units(UNIT).amount > aggressive_units[UNIT][
                1]:
                for s in self.units(UNIT).idle:
                    await self.do(s.attack(self.find_target(self.state)))
            # 防衛模式
            elif self.units(UNIT).amount > aggressive_units[UNIT][1]:
                if len(self.known_enemy_units) > 0:
                    for s in self.units(UNIT).idle:
                        await self.do(s.attack(random.choice(self.known_enemy_units)))
## 啟動遊戲
run_game(maps.get("AcidPlantLE"), [
    Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Hard)
], realtime=False)

運行結果如下

可以看到,擊敗了困難人族電腦,但是電腦選擇了rush戰術,我們寫得AI腳本會輸掉遊戲。顯然,這不是最佳方案。
“只有AI才能拯救我的勝率”,請看下文。

採集地圖數據

這次我們只造一個折躍門,全力通過星門造虛空光輝艦
修改offensive_force_buildings(self)方法的判斷

elif len(self.units(GATEWAY)) < 1:
                if self.can_afford(GATEWAY) and not self.already_pending(GATEWAY):
                    await self.build(GATEWAY, near=pylon)

註釋或者刪除build_offensive_force(self)的建造追獵者的代碼

        ## 造兵
    async def build_offensive_force(self):
        # 無隊列化建造
        # for gw in self.units(UnitTypeId.GATEWAY).ready.noqueue:
        #     if not self.units(UnitTypeId.STALKER).amount > self.units(UnitTypeId.VOIDRAY).amount:
        #
        #         if self.can_afford(UnitTypeId.STALKER) and self.supply_left > 0:
        #             await self.do(gw.train(UnitTypeId.STALKER))

        for sg in self.units(UnitTypeId.STARGATE).ready.noqueue:
            if self.can_afford(UnitTypeId.VOIDRAY) and self.supply_left > 0:
                await self.do(sg.train(UnitTypeId.VOIDRAY))

attack(self)中的aggressive_units註釋掉Stalker
導入numpy和cv2庫

game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)

建立以地圖Heigt為行,Width為列的三維矩陣

for nexus in self.units(NEXUS):
            nex_pos = nexus.position
            print(nex_pos)
            cv2.circle(game_data, (int(nex_pos[0]), int(nex_pos[1])), 10, (0, 255, 0), -1)  # BGR

遍歷星靈樞紐,獲取下一個位置,畫圓,circle(承載圓的img, 圓心, 半徑, 顏色, thickness=-1表示填充)
接下來我們要垂直翻轉三維矩陣,因為我們建立的矩陣左上角是原點(0,0),縱坐標向下延申,橫坐標向右延申。翻轉之後就成了正常的坐標系。

flipped = cv2.flip(game_data, 0)

圖像縮放,達到可視化最佳。

        resized = cv2.resize(flipped, dsize=None, fx=2, fy=2)
        cv2.imshow('Intel', resized)
        cv2.waitKey(1)

至此,完整代碼如下

import sc2
from sc2 import run_game, maps, Race, Difficulty
from sc2.player import Bot, Computer
from sc2.constants import *
import random
import numpy as np
import cv2


class SentdeBot(sc2.BotAI):
    def __init__(self):
        # 經過計算,每分鐘大約165迭代次數
        self.ITERATIONS_PER_MINUTE = 165
        # 最大农民數量
        self.MAX_WORKERS = 65

    async def on_step(self, iteration: int):
        self.iteration = iteration
        await self.distribute_workers()
        await self.build_workers()
        await self.build_pylons()
        await self.build_assimilators()
        await self.expand()
        await self.offensive_force_buildings()
        await self.build_offensive_force()
        await self.intel()
        await self.attack()

    async def intel(self):
        # 根據地圖建立的三維矩陣
        game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)
        for nexus in self.units(UnitTypeId.NEXUS):
            nex_pos = nexus.position
            # circle(承載圓的img, 圓心, 半徑, 顏色, thickness=-1表示填充)
            # 記錄星靈樞紐的位置
            cv2.circle(game_data, (int(nex_pos[0]), int(nex_pos[1])), 10, (0, 255, 0), -1)
        # 圖像翻轉垂直鏡像
        flipped = cv2.flip(game_data, 0)
        # 圖像縮放
        # cv2.resize(原圖像,輸出圖像的大小,width方向的縮放比例,height方向縮放的比例)
        resized = cv2.resize(flipped, dsize=None, fx=2, fy=2)
        cv2.imshow('Intel', resized)

        # cv2.waitKey(每Xms刷新圖像)
        cv2.waitKey(1)

    # 建造农民
    async def build_workers(self):
        # 星靈樞鈕*16(一個基地配備16個农民)大於農民數量並且現有农民數量小於MAX_WORKERS
        if len(self.units(UnitTypeId.NEXUS)) * 16 > len(self.units(UnitTypeId.PROBE)) and len(
                self.units(UnitTypeId.PROBE)) < self.MAX_WORKERS:
            # 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於佔用資源
            for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
                # 是否有50晶體礦建造农民
                if self.can_afford(UnitTypeId.PROBE):
                    await self.do(nexus.train(UnitTypeId.PROBE))

    ## 建造水晶
    async def build_pylons(self):
        ## 供應人口和現有人口之差小於5且建築不是正在建造
        if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
            nexuses = self.units(UnitTypeId.NEXUS).ready
            if nexuses.exists:
                if self.can_afford(UnitTypeId.PYLON):
                    await self.build(UnitTypeId.PYLON, near=nexuses.first)

    ## 建造吸收廠
    async def build_assimilators(self):
        for nexus in self.units(UnitTypeId.NEXUS).ready:
            # 在瓦斯泉上建造吸收廠
            vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus)
            for vaspene in vaspenes:
                if not self.can_afford(UnitTypeId.ASSIMILATOR):
                    break
                worker = self.select_build_worker(vaspene.position)
                if worker is None:
                    break
                if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0, vaspene).exists:
                    await self.do(worker.build(UnitTypeId.ASSIMILATOR, vaspene))

    ## 開礦
    async def expand(self):
        # (self.iteration / self.ITERATIONS_PER_MINUTE)是一個緩慢遞增的值,動態開礦
        if self.units(UnitTypeId.NEXUS).amount < self.iteration / self.ITERATIONS_PER_MINUTE and self.can_afford(
                UnitTypeId.NEXUS):
            await self.expand_now()

    ## 建造進攻性建築
    async def offensive_force_buildings(self):
        print(self.iteration / self.ITERATIONS_PER_MINUTE)
        if self.units(UnitTypeId.PYLON).ready.exists:
            pylon = self.units(UnitTypeId.PYLON).ready.random
            # 根據神族建築科技圖,折躍門建造過後才可以建造控制核心
            if self.units(UnitTypeId.GATEWAY).ready.exists and not self.units(UnitTypeId.CYBERNETICSCORE):
                if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
                    await self.build(UnitTypeId.CYBERNETICSCORE, near=pylon)
            # 否則建造折躍門
            # (self.iteration / self.ITERATIONS_PER_MINUTE)/2 是一個緩慢遞增的值
            # elif len(self.units(UnitTypeId.GATEWAY)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
            elif len(self.units(UnitTypeId.GATEWAY)) < 1:
                if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
                    await self.build(UnitTypeId.GATEWAY, near=pylon)
            # 控制核心存在的情況下建造星門
            if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
                if len(self.units(UnitTypeId.STARGATE)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
                    if self.can_afford(UnitTypeId.STARGATE) and not self.already_pending(UnitTypeId.STARGATE):
                        await self.build(UnitTypeId.STARGATE, near=pylon)

    ## 造兵
    async def build_offensive_force(self):
        # 無隊列化建造
        for sg in self.units(UnitTypeId.STARGATE).ready.noqueue:
            if self.can_afford(UnitTypeId.VOIDRAY) and self.supply_left > 0:
                await self.do(sg.train(UnitTypeId.VOIDRAY))

    ## 尋找目標
    def find_target(self, state):
        if len(self.known_enemy_units) > 0:
            # 隨機選取敵方單位
            return random.choice(self.known_enemy_units)
        elif len(self.known_enemy_units) > 0:
            # 隨機選取敵方建築
            return random.choice(self.known_enemy_structures)
        else:
            # 返回敵方出生點位
            return self.enemy_start_locations[0]

    ## 進攻
    async def attack(self):
        # {UNIT: [n to fight, n to defend]}
        aggressive_units = {UnitTypeId.VOIDRAY: [8, 3]}

        for UNIT in aggressive_units:
            # 攻擊模式
            if self.units(UNIT).amount > aggressive_units[UNIT][0] and self.units(UNIT).amount > aggressive_units[UNIT][1]:
                for s in self.units(UNIT).idle:
                    await self.do(s.attack(self.find_target(self.state)))
            # 防衛模式
            elif self.units(UNIT).amount > aggressive_units[UNIT][1]:
                if len(self.known_enemy_units) > 0:
                    for s in self.units(UNIT).idle:
                        await self.do(s.attack(random.choice(self.known_enemy_units)))


## 啟動遊戲
run_game(maps.get("AcidPlantLE"), [
    Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Hard)
], realtime=False)

運行結果如下

採集到了地圖位置。

偵察

在intel(self)里創建一個字典draw_dict,UnitTypeId作為key,半徑和顏色是value


        draw_dict = {
            UnitTypeId.NEXUS: [15, (0, 255, 0)],
            UnitTypeId.PYLON: [3, (20, 235, 0)],
            UnitTypeId.PROBE: [1, (55, 200, 0)],
            UnitTypeId.ASSIMILATOR: [2, (55, 200, 0)],
            UnitTypeId.GATEWAY: [3, (200, 100, 0)],
            UnitTypeId.CYBERNETICSCORE: [3, (150, 150, 0)],
            UnitTypeId.STARGATE: [5, (255, 0, 0)],
            UnitTypeId.ROBOTICSFACILITY: [5, (215, 155, 0)],

            UnitTypeId.VOIDRAY: [3, (255, 100, 0)]
        }

迭代同上

for unit_type in draw_dict:
            for unit in self.units(unit_type).ready:
                pos = unit.position
                cv2.circle(game_data, (int(pos[0]), int(pos[1])), draw_dict[unit_type][0], draw_dict[unit_type][1], -1)

存儲三族的主基地名稱(星靈樞紐,指揮中心,孵化場),刻畫敵方建築。

# 主基地名稱
        main_base_names = ["nexus", "supplydepot", "hatchery"]
        # 記錄敵方基地位置
        for enemy_building in self.known_enemy_structures:
            pos = enemy_building.position
            if enemy_building.name.lower() not in main_base_names:
                cv2.circle(game_data, (int(pos[0]), int(pos[1])), 5, (200, 50, 212), -1)
        for enemy_building in self.known_enemy_structures:
            pos = enemy_building.position
            if enemy_building.name.lower() in main_base_names:
                cv2.circle(game_data, (int(pos[0]), int(pos[1])), 15, (0, 0, 255), -1)

刻畫敵方單位,如果是农民畫得小些,其他單位則畫大些。

        for enemy_unit in self.known_enemy_units:

            if not enemy_unit.is_structure:
                worker_names = ["probe", "scv", "drone"]
                # if that unit is a PROBE, SCV, or DRONE... it's a worker
                pos = enemy_unit.position
                if enemy_unit.name.lower() in worker_names:
                    cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (55, 0, 155), -1)
                else:
                    cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (50, 0, 215), -1)

在offensive_force_buildings(self)方法中添加建造机械台

            if self.units(CYBERNETICSCORE).ready.exists:
                if len(self.units(ROBOTICSFACILITY)) < 1:
                    if self.can_afford(ROBOTICSFACILITY) and not self.already_pending(ROBOTICSFACILITY):
                        await self.build(ROBOTICSFACILITY, near=pylon)

創建scout(),訓練Observer

async def scout(self):
        if len(self.units(OBSERVER)) > 0:
            scout = self.units(OBSERVER)[0]
            if scout.is_idle:
                enemy_location = self.enemy_start_locations[0]
                move_to = self.random_location_variance(enemy_location)
                print(move_to)
                await self.do(scout.move(move_to))

        else:
            for rf in self.units(ROBOTICSFACILITY).ready.noqueue:
                if self.can_afford(OBSERVER) and self.supply_left > 0:
                    await self.do(rf.train(OBSERVER))

生成隨機位置,很簡單。意思是橫坐標累計遞增-0.2和0.2倍的橫坐標,限制條件為如果x超過橫坐標,那麼就是橫坐標最大值。
縱坐標同理。

    def random_location_variance(self, enemy_start_location):
        x = enemy_start_location[0]
        y = enemy_start_location[1]

        x += ((random.randrange(-20, 20))/100) * enemy_start_location[0]
        y += ((random.randrange(-20, 20))/100) * enemy_start_location[1]

        if x < 0:
            x = 0
        if y < 0:
            y = 0
        if x > self.game_info.map_size[0]:
            x = self.game_info.map_size[0]
        if y > self.game_info.map_size[1]:
            y = self.game_info.map_size[1]

        go_to = position.Point2(position.Pointlike((x,y)))
        return go_to

完整代碼如下

# -*- encoding: utf-8 -*-
'''
@File    :   demo.py
@Modify Time      @Author       @Desciption
------------      -------       -----------
2019/11/3 12:32   Jonas           None
'''

import sc2
from sc2 import run_game, maps, Race, Difficulty, position
from sc2.player import Bot, Computer
from sc2.constants import *
import random
import numpy as np
import cv2


class SentdeBot(sc2.BotAI):
    def __init__(self):
        # 經過計算,每分鐘大約165迭代次數
        self.ITERATIONS_PER_MINUTE = 165
        # 最大农民數量
        self.MAX_WORKERS = 50

    async def on_step(self, iteration: int):
        self.iteration = iteration
        await self.scout()
        await self.distribute_workers()
        await self.build_workers()
        await self.build_pylons()
        await self.build_assimilators()
        await self.expand()
        await self.offensive_force_buildings()
        await self.build_offensive_force()
        await self.intel()
        await self.attack()

    ## 偵察
    async def scout(self):
        if len(self.units(UnitTypeId.OBSERVER)) > 0:
            scout = self.units(UnitTypeId.OBSERVER)[0]
            if scout.is_idle:
                enemy_location = self.enemy_start_locations[0]
                move_to = self.random_location_variance(enemy_location)
                print(move_to)
                await self.do(scout.move(move_to))

        else:
            for rf in self.units(UnitTypeId.ROBOTICSFACILITY).ready.noqueue:
                if self.can_afford(UnitTypeId.OBSERVER) and self.supply_left > 0:
                    await self.do(rf.train(UnitTypeId.OBSERVER))

    async def intel(self):
        game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)

        # UnitTypeId作為key,半徑和顏色是value
        draw_dict = {
            UnitTypeId.NEXUS: [15, (0, 255, 0)],
            UnitTypeId.PYLON: [3, (20, 235, 0)],
            UnitTypeId.PROBE: [1, (55, 200, 0)],
            UnitTypeId.ASSIMILATOR: [2, (55, 200, 0)],
            UnitTypeId.GATEWAY: [3, (200, 100, 0)],
            UnitTypeId.CYBERNETICSCORE: [3, (150, 150, 0)],
            UnitTypeId.STARGATE: [5, (255, 0, 0)],
            UnitTypeId.ROBOTICSFACILITY: [5, (215, 155, 0)],

            UnitTypeId.VOIDRAY: [3, (255, 100, 0)],
            # OBSERVER: [3, (255, 255, 255)],
        }

        for unit_type in draw_dict:
            for unit in self.units(unit_type).ready:
                pos = unit.position
                cv2.circle(game_data, (int(pos[0]), int(pos[1])), draw_dict[unit_type][0], draw_dict[unit_type][1], -1)

        # 主基地名稱
        main_base_names = ["nexus", "supplydepot", "hatchery"]
        # 記錄敵方基地位置
        for enemy_building in self.known_enemy_structures:
            pos = enemy_building.position
            # 不是主基地建築,畫小一些
            if enemy_building.name.lower() not in main_base_names:
                cv2.circle(game_data, (int(pos[0]), int(pos[1])), 5, (200, 50, 212), -1)
        for enemy_building in self.known_enemy_structures:
            pos = enemy_building.position
            if enemy_building.name.lower() in main_base_names:
                cv2.circle(game_data, (int(pos[0]), int(pos[1])), 15, (0, 0, 255), -1)

        for enemy_unit in self.known_enemy_units:

            if not enemy_unit.is_structure:
                worker_names = ["probe", "scv", "drone"]
                # if that unit is a PROBE, SCV, or DRONE... it's a worker
                pos = enemy_unit.position
                if enemy_unit.name.lower() in worker_names:
                    cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (55, 0, 155), -1)
                else:
                    cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (50, 0, 215), -1)

        for obs in self.units(UnitTypeId.OBSERVER).ready:
            pos = obs.position
            cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (255, 255, 255), -1)

        # flip horizontally to make our final fix in visual representation:
        flipped = cv2.flip(game_data, 0)
        resized = cv2.resize(flipped, dsize=None, fx=2, fy=2)

        cv2.imshow('Intel', resized)
        cv2.waitKey(1)

    def random_location_variance(self, enemy_start_location):
        x = enemy_start_location[0]
        y = enemy_start_location[1]

        x += ((random.randrange(-20, 20)) / 100) * enemy_start_location[0]
        y += ((random.randrange(-20, 20)) / 100) * enemy_start_location[1]

        if x < 0:
            x = 0
        if y < 0:
            y = 0
        if x > self.game_info.map_size[0]:
            x = self.game_info.map_size[0]
        if y > self.game_info.map_size[1]:
            y = self.game_info.map_size[1]

        go_to = position.Point2(position.Pointlike((x, y)))
        return go_to

    # 建造农民
    async def build_workers(self):
        # 星靈樞鈕*16(一個基地配備16個农民)大於農民數量並且現有农民數量小於MAX_WORKERS
        if len(self.units(UnitTypeId.NEXUS)) * 16 > len(self.units(UnitTypeId.PROBE)) and len(
                self.units(UnitTypeId.PROBE)) < self.MAX_WORKERS:
            # 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於佔用資源
            for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
                # 是否有50晶體礦建造农民
                if self.can_afford(UnitTypeId.PROBE):
                    await self.do(nexus.train(UnitTypeId.PROBE))

    ## 建造水晶
    async def build_pylons(self):
        ## 供應人口和現有人口之差小於5且建築不是正在建造
        if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
            nexuses = self.units(UnitTypeId.NEXUS).ready
            if nexuses.exists:
                if self.can_afford(UnitTypeId.PYLON):
                    await self.build(UnitTypeId.PYLON, near=nexuses.first)

    ## 建造吸收廠
    async def build_assimilators(self):
        for nexus in self.units(UnitTypeId.NEXUS).ready:
            # 在瓦斯泉上建造吸收廠
            vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus)
            for vaspene in vaspenes:
                if not self.can_afford(UnitTypeId.ASSIMILATOR):
                    break
                worker = self.select_build_worker(vaspene.position)
                if worker is None:
                    break
                if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0, vaspene).exists:
                    await self.do(worker.build(UnitTypeId.ASSIMILATOR, vaspene))

    ## 開礦
    async def expand(self):
        # (self.iteration / self.ITERATIONS_PER_MINUTE)是一個緩慢遞增的值,動態開礦
        if self.units(UnitTypeId.NEXUS).amount < self.iteration / self.ITERATIONS_PER_MINUTE and self.can_afford(
                UnitTypeId.NEXUS):
            await self.expand_now()

    ## 建造進攻性建築
    async def offensive_force_buildings(self):
        print(self.iteration / self.ITERATIONS_PER_MINUTE)
        if self.units(UnitTypeId.PYLON).ready.exists:
            pylon = self.units(UnitTypeId.PYLON).ready.random
            # 根據神族建築科技圖,折躍門建造過後才可以建造控制核心
            if self.units(UnitTypeId.GATEWAY).ready.exists and not self.units(UnitTypeId.CYBERNETICSCORE):
                if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
                    await self.build(UnitTypeId.CYBERNETICSCORE, near=pylon)
            # 否則建造折躍門
            # (self.iteration / self.ITERATIONS_PER_MINUTE)/2 是一個緩慢遞增的值
            # elif len(self.units(UnitTypeId.GATEWAY)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
            elif len(self.units(UnitTypeId.GATEWAY)) < 1:
                if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
                    await self.build(UnitTypeId.GATEWAY, near=pylon)
            # 控制核心存在的情況下建造机械台
            if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
                if len(self.units(UnitTypeId.ROBOTICSFACILITY)) < 1:
                    if self.can_afford(UnitTypeId.ROBOTICSFACILITY) and not self.already_pending(
                            UnitTypeId.ROBOTICSFACILITY):
                        await self.build(UnitTypeId.ROBOTICSFACILITY, near=pylon)

            # 控制核心存在的情況下建造星門
            if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
                if len(self.units(UnitTypeId.STARGATE)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
                    if self.can_afford(UnitTypeId.STARGATE) and not self.already_pending(UnitTypeId.STARGATE):
                        await self.build(UnitTypeId.STARGATE, near=pylon)

    ## 造兵
    async def build_offensive_force(self):
        # 無隊列化建造
        # for gw in self.units(UnitTypeId.GATEWAY).ready.noqueue:
        #     if not self.units(UnitTypeId.STALKER).amount > self.units(UnitTypeId.VOIDRAY).amount:
        #
        #         if self.can_afford(UnitTypeId.STALKER) and self.supply_left > 0:
        #             await self.do(gw.train(UnitTypeId.STALKER))

        for sg in self.units(UnitTypeId.STARGATE).ready.noqueue:
            if self.can_afford(UnitTypeId.VOIDRAY) and self.supply_left > 0:
                await self.do(sg.train(UnitTypeId.VOIDRAY))

    ## 尋找目標
    def find_target(self, state):
        if len(self.known_enemy_units) > 0:
            # 隨機選取敵方單位
            return random.choice(self.known_enemy_units)
        elif len(self.known_enemy_units) > 0:
            # 隨機選取敵方建築
            return random.choice(self.known_enemy_structures)
        else:
            # 返回敵方出生點位
            return self.enemy_start_locations[0]

    ## 進攻
    async def attack(self):
        # {UNIT: [n to fight, n to defend]}
        aggressive_units = {UnitTypeId.VOIDRAY: [8, 3]}

        for UNIT in aggressive_units:
            # 攻擊模式
            if self.units(UNIT).amount > aggressive_units[UNIT][0] and self.units(UNIT).amount > aggressive_units[UNIT][
                1]:
                for s in self.units(UNIT).idle:
                    await self.do(s.attack(self.find_target(self.state)))
            # 防衛模式
            elif self.units(UNIT).amount > aggressive_units[UNIT][1]:
                if len(self.known_enemy_units) > 0:
                    for s in self.units(UNIT).idle:
                        await self.do(s.attack(random.choice(self.known_enemy_units)))


## 啟動遊戲
run_game(maps.get("AcidPlantLE"), [
    Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Hard)
], realtime=False)

運行結果如下,紅色和粉紅色是敵方單位。

創建訓練數據

統計資源、人口和軍隊人口比,在intel方法添加如下代碼

        # 追蹤資源、人口和軍隊人口比
        line_max = 50
        mineral_ratio = self.minerals / 1500
        if mineral_ratio > 1.0:
            mineral_ratio = 1.0

        vespene_ratio = self.vespene / 1500
        if vespene_ratio > 1.0:
            vespene_ratio = 1.0

        population_ratio = self.supply_left / self.supply_cap
        if population_ratio > 1.0:
            population_ratio = 1.0

        plausible_supply = self.supply_cap / 200.0

        military_weight = len(self.units(UnitTypeId.VOIDRAY)) / (self.supply_cap - self.supply_left)
        if military_weight > 1.0:
            military_weight = 1.0

        # 农民/人口      worker/supply ratio
        cv2.line(game_data, (0, 19), (int(line_max * military_weight), 19), (250, 250, 200), 3)
        # 人口/200    plausible supply (supply/200.0)
        cv2.line(game_data, (0, 15), (int(line_max * plausible_supply), 15), (220, 200, 200), 3)
        # (人口-現有人口)/人口  population ratio (supply_left/supply)
        cv2.line(game_data, (0, 11), (int(line_max * population_ratio), 11), (150, 150, 150), 3)
        # 氣體/1500   gas/1500
        cv2.line(game_data, (0, 7), (int(line_max * vespene_ratio), 7), (210, 200, 0), 3)
        # 晶體礦/1500  minerals minerals/1500
        cv2.line(game_data, (0, 3), (int(line_max * mineral_ratio), 3), (0, 255, 25), 3)

運行結果如下,左下角自上而下依次是“农民/人口”,“人口/200”,“(人口-現有人口)/人口”,“氣體/1500”,“晶體礦/1500”

採集進攻行為數據,在attack方法中加入如下代碼

        if len(self.units(UnitTypeId.VOIDRAY).idle) > 0:
            choice = random.randrange(0, 4)
            target = False
            if self.iteration > self.do_something_after:
                if choice == 0:
                    # 什麼都不做
                    wait = random.randrange(20, 165)
                    self.do_something_after = self.iteration + wait

                elif choice == 1:
                    # 攻擊離星靈樞紐最近的單位
                    if len(self.known_enemy_units) > 0:
                        target = self.known_enemy_units.closest_to(random.choice(self.units(UnitTypeId.NEXUS)))

                elif choice == 2:
                    # 攻擊敵方建築
                    if len(self.known_enemy_structures) > 0:
                        target = random.choice(self.known_enemy_structures)

                elif choice == 3:
                    # 攻擊敵方出生位置
                    target = self.enemy_start_locations[0]

                if target:
                    for vr in self.units(UnitTypeId.VOIDRAY).idle:
                        await self.do(vr.attack(target))
                y = np.zeros(4)
                y[choice] = 1
                print(y)
                self.train_data.append([y, self.flipped])

輸出如下結果

···
[1. 0. 0. 0.]
[0. 0. 1. 0.]
[0. 0. 0. 1.]
[0. 0. 1. 0.]
[1. 0. 0. 0.]
···

為了使用self.flipped = cv2.flip(game_data, 0),修改

        flipped = cv2.flip(game_data, 0)
        resized = cv2.resize(flipped, dsize=None, fx=2, fy=2)

        self.flipped = cv2.flip(game_data, 0)
        resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2)

init 方法添加do_something_after和train_data

    def __init__(self):
        self.ITERATIONS_PER_MINUTE = 165
        self.MAX_WORKERS = 50
        self.do_something_after = 0
        self.train_data = []

採集攻擊數據的時候不需要畫圖,我們在類前加HEADLESS = False,intel方法代碼修改如下

        self.flipped = cv2.flip(game_data, 0)

        if not HEADLESS:
            resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2)
            cv2.imshow('Intel', resized)
            cv2.waitKey(1)

加入on_end方法,只存儲勝利的數據,在和代碼同級目錄新建train_data文件夾

    def on_end(self, game_result):
        print('--- on_end called ---')
        print(game_result)

        if game_result == Result.Victory:
            np.save("train_data/{}.npy".format(str(int(time.time()))), np.array(self.train_data))

完整代碼如下

import os
import time

import sc2
from sc2 import run_game, maps, Race, Difficulty, position, Result
from sc2.player import Bot, Computer
from sc2.constants import *
import random
import numpy as np
import cv2

HEADLESS = True
# os.environ["SC2PATH"] = 'F:\StarCraft II'

class SentdeBot(sc2.BotAI):
    def __init__(self):
        # 經過計算,每分鐘大約165迭代次數
        self.ITERATIONS_PER_MINUTE = 165
        # 最大农民數量
        self.MAX_WORKERS = 50
        self.do_something_after = 0
        self.train_data = []

    def on_end(self, game_result):
        print('--- on_end called ---')
        print(game_result)

        if game_result == Result.Victory:
            np.save("train_data/{}.npy".format(str(int(time.time()))), np.array(self.train_data))

    async def on_step(self, iteration: int):
        self.iteration = iteration
        await self.scout()
        await self.distribute_workers()
        await self.build_workers()
        await self.build_pylons()
        await self.build_assimilators()
        await self.expand()
        await self.offensive_force_buildings()
        await self.build_offensive_force()
        await self.intel()
        await self.attack()

    ## 偵察
    async def scout(self):
        if len(self.units(UnitTypeId.OBSERVER)) > 0:
            scout = self.units(UnitTypeId.OBSERVER)[0]
            if scout.is_idle:
                enemy_location = self.enemy_start_locations[0]
                move_to = self.random_location_variance(enemy_location)
                print(move_to)
                await self.do(scout.move(move_to))

        else:
            for rf in self.units(UnitTypeId.ROBOTICSFACILITY).ready.noqueue:
                if self.can_afford(UnitTypeId.OBSERVER) and self.supply_left > 0:
                    await self.do(rf.train(UnitTypeId.OBSERVER))

    async def intel(self):
        game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)

        # UnitTypeId作為key,半徑和顏色是value
        draw_dict = {
            UnitTypeId.NEXUS: [15, (0, 255, 0)],
            UnitTypeId.PYLON: [3, (20, 235, 0)],
            UnitTypeId.PROBE: [1, (55, 200, 0)],
            UnitTypeId.ASSIMILATOR: [2, (55, 200, 0)],
            UnitTypeId.GATEWAY: [3, (200, 100, 0)],
            UnitTypeId.CYBERNETICSCORE: [3, (150, 150, 0)],
            UnitTypeId.STARGATE: [5, (255, 0, 0)],
            UnitTypeId.ROBOTICSFACILITY: [5, (215, 155, 0)],

            UnitTypeId.VOIDRAY: [3, (255, 100, 0)],
            # OBSERVER: [3, (255, 255, 255)],
        }

        for unit_type in draw_dict:
            for unit in self.units(unit_type).ready:
                pos = unit.position
                cv2.circle(game_data, (int(pos[0]), int(pos[1])), draw_dict[unit_type][0], draw_dict[unit_type][1], -1)

        # 主基地名稱
        main_base_names = ["nexus", "supplydepot", "hatchery"]
        # 記錄敵方基地位置
        for enemy_building in self.known_enemy_structures:
            pos = enemy_building.position
            # 不是主基地建築,畫小一些
            if enemy_building.name.lower() not in main_base_names:
                cv2.circle(game_data, (int(pos[0]), int(pos[1])), 5, (200, 50, 212), -1)
        for enemy_building in self.known_enemy_structures:
            pos = enemy_building.position
            if enemy_building.name.lower() in main_base_names:
                cv2.circle(game_data, (int(pos[0]), int(pos[1])), 15, (0, 0, 255), -1)

        for enemy_unit in self.known_enemy_units:

            if not enemy_unit.is_structure:
                worker_names = ["probe", "scv", "drone"]
                # if that unit is a PROBE, SCV, or DRONE... it's a worker
                pos = enemy_unit.position
                if enemy_unit.name.lower() in worker_names:
                    cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (55, 0, 155), -1)
                else:
                    cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (50, 0, 215), -1)

        for obs in self.units(UnitTypeId.OBSERVER).ready:
            pos = obs.position
            cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (255, 255, 255), -1)


        # 追蹤資源、人口和軍隊人口比
        line_max = 50
        mineral_ratio = self.minerals / 1500
        if mineral_ratio > 1.0:
            mineral_ratio = 1.0

        vespene_ratio = self.vespene / 1500
        if vespene_ratio > 1.0:
            vespene_ratio = 1.0

        population_ratio = self.supply_left / self.supply_cap
        if population_ratio > 1.0:
            population_ratio = 1.0

        plausible_supply = self.supply_cap / 200.0

        military_weight = len(self.units(UnitTypeId.VOIDRAY)) / (self.supply_cap - self.supply_left)
        if military_weight > 1.0:
            military_weight = 1.0

        # 农民/人口      worker/supply ratio
        cv2.line(game_data, (0, 19), (int(line_max * military_weight), 19), (250, 250, 200), 3)
        # 人口/200    plausible supply (supply/200.0)
        cv2.line(game_data, (0, 15), (int(line_max * plausible_supply), 15), (220, 200, 200), 3)
        # (人口-現有人口)/人口  population ratio (supply_left/supply)
        cv2.line(game_data, (0, 11), (int(line_max * population_ratio), 11), (150, 150, 150), 3)
        # 氣體/1500   gas/1500
        cv2.line(game_data, (0, 7), (int(line_max * vespene_ratio), 7), (210, 200, 0), 3)
        # 晶體礦/1500  minerals minerals/1500
        cv2.line(game_data, (0, 3), (int(line_max * mineral_ratio), 3), (0, 255, 25), 3)




        # flip horizontally to make our final fix in visual representation:
        self.flipped = cv2.flip(game_data, 0)

        if HEADLESS:
            resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2)

            cv2.imshow('Intel', resized)
            cv2.waitKey(1)

    def random_location_variance(self, enemy_start_location):
        x = enemy_start_location[0]
        y = enemy_start_location[1]

        x += ((random.randrange(-20, 20)) / 100) * enemy_start_location[0]
        y += ((random.randrange(-20, 20)) / 100) * enemy_start_location[1]

        if x < 0:
            x = 0
        if y < 0:
            y = 0
        if x > self.game_info.map_size[0]:
            x = self.game_info.map_size[0]
        if y > self.game_info.map_size[1]:
            y = self.game_info.map_size[1]

        go_to = position.Point2(position.Pointlike((x, y)))
        return go_to

    # 建造农民
    async def build_workers(self):
        # 星靈樞鈕*16(一個基地配備16個农民)大於農民數量並且現有农民數量小於MAX_WORKERS
        if len(self.units(UnitTypeId.NEXUS)) * 16 > len(self.units(UnitTypeId.PROBE)) and len(
                self.units(UnitTypeId.PROBE)) < self.MAX_WORKERS:
            # 星靈樞紐(NEXUS)無隊列建造,可以提高晶體礦的利用率,不至於佔用資源
            for nexus in self.units(UnitTypeId.NEXUS).ready.noqueue:
                # 是否有50晶體礦建造农民
                if self.can_afford(UnitTypeId.PROBE):
                    await self.do(nexus.train(UnitTypeId.PROBE))

    ## 建造水晶
    async def build_pylons(self):
        ## 供應人口和現有人口之差小於5且建築不是正在建造
        if self.supply_left < 5 and not self.already_pending(UnitTypeId.PYLON):
            nexuses = self.units(UnitTypeId.NEXUS).ready
            if nexuses.exists:
                if self.can_afford(UnitTypeId.PYLON):
                    await self.build(UnitTypeId.PYLON, near=nexuses.first)

    ## 建造吸收廠
    async def build_assimilators(self):
        for nexus in self.units(UnitTypeId.NEXUS).ready:
            # 在瓦斯泉上建造吸收廠
            vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus)
            for vaspene in vaspenes:
                if not self.can_afford(UnitTypeId.ASSIMILATOR):
                    break
                worker = self.select_build_worker(vaspene.position)
                if worker is None:
                    break
                if not self.units(UnitTypeId.ASSIMILATOR).closer_than(1.0, vaspene).exists:
                    await self.do(worker.build(UnitTypeId.ASSIMILATOR, vaspene))

    ## 開礦
    async def expand(self):
        # (self.iteration / self.ITERATIONS_PER_MINUTE)是一個緩慢遞增的值,動態開礦
        if self.units(UnitTypeId.NEXUS).amount < self.iteration / self.ITERATIONS_PER_MINUTE and self.can_afford(
                UnitTypeId.NEXUS):
            await self.expand_now()

    ## 建造進攻性建築
    async def offensive_force_buildings(self):
        # print(self.iteration / self.ITERATIONS_PER_MINUTE)
        if self.units(UnitTypeId.PYLON).ready.exists:
            pylon = self.units(UnitTypeId.PYLON).ready.random
            # 根據神族建築科技圖,折躍門建造過後才可以建造控制核心
            if self.units(UnitTypeId.GATEWAY).ready.exists and not self.units(UnitTypeId.CYBERNETICSCORE):
                if self.can_afford(UnitTypeId.CYBERNETICSCORE) and not self.already_pending(UnitTypeId.CYBERNETICSCORE):
                    await self.build(UnitTypeId.CYBERNETICSCORE, near=pylon)
            # 否則建造折躍門
            # (self.iteration / self.ITERATIONS_PER_MINUTE)/2 是一個緩慢遞增的值
            # elif len(self.units(UnitTypeId.GATEWAY)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
            elif len(self.units(UnitTypeId.GATEWAY)) < 1:
                if self.can_afford(UnitTypeId.GATEWAY) and not self.already_pending(UnitTypeId.GATEWAY):
                    await self.build(UnitTypeId.GATEWAY, near=pylon)
            # 控制核心存在的情況下建造机械台
            if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
                if len(self.units(UnitTypeId.ROBOTICSFACILITY)) < 1:
                    if self.can_afford(UnitTypeId.ROBOTICSFACILITY) and not self.already_pending(
                            UnitTypeId.ROBOTICSFACILITY):
                        await self.build(UnitTypeId.ROBOTICSFACILITY, near=pylon)

            # 控制核心存在的情況下建造星門
            if self.units(UnitTypeId.CYBERNETICSCORE).ready.exists:
                if len(self.units(UnitTypeId.STARGATE)) < ((self.iteration / self.ITERATIONS_PER_MINUTE) / 2):
                    if self.can_afford(UnitTypeId.STARGATE) and not self.already_pending(UnitTypeId.STARGATE):
                        await self.build(UnitTypeId.STARGATE, near=pylon)

    ## 造兵
    async def build_offensive_force(self):
        # 無隊列化建造
        # for gw in self.units(UnitTypeId.GATEWAY).ready.noqueue:
        #     if not self.units(UnitTypeId.STALKER).amount > self.units(UnitTypeId.VOIDRAY).amount:
        #
        #         if self.can_afford(UnitTypeId.STALKER) and self.supply_left > 0:
        #             await self.do(gw.train(UnitTypeId.STALKER))

        for sg in self.units(UnitTypeId.STARGATE).ready.noqueue:
            if self.can_afford(UnitTypeId.VOIDRAY) and self.supply_left > 0:
                await self.do(sg.train(UnitTypeId.VOIDRAY))

    ## 尋找目標
    def find_target(self, state):
        if len(self.known_enemy_units) > 0:
            # 隨機選取敵方單位
            return random.choice(self.known_enemy_units)
        elif len(self.known_enemy_units) > 0:
            # 隨機選取敵方建築
            return random.choice(self.known_enemy_structures)
        else:
            # 返回敵方出生點位
            return self.enemy_start_locations[0]

    ## 進攻
    async def attack(self):
        if len(self.units(UnitTypeId.VOIDRAY).idle) > 0:
            choice = random.randrange(0, 4)
            target = False
            if self.iteration > self.do_something_after:
                if choice == 0:
                    # 什麼都不做
                    wait = random.randrange(20, 165)
                    self.do_something_after = self.iteration + wait

                elif choice == 1:
                    # 攻擊離星靈樞紐最近的單位
                    if len(self.known_enemy_units) > 0:
                        target = self.known_enemy_units.closest_to(random.choice(self.units(UnitTypeId.NEXUS)))

                elif choice == 2:
                    # 攻擊敵方建築
                    if len(self.known_enemy_structures) > 0:
                        target = random.choice(self.known_enemy_structures)

                elif choice == 3:
                    # 攻擊敵方出生位置
                    target = self.enemy_start_locations[0]

                if target:
                    for vr in self.units(UnitTypeId.VOIDRAY).idle:
                        await self.do(vr.attack(target))
                y = np.zeros(4)
                y[choice] = 1
                print(y)
                self.train_data.append([y, self.flipped])


## 啟動遊戲
run_game(maps.get("AcidPlantLE"), [
    Bot(Race.Protoss, SentdeBot()), Computer(Race.Terran, Difficulty.Medium)
], realtime=False)

可以看到train_data文件夾下存儲了勝利數據

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

深入理解Kafka必知必會(2)

Kafka目前有哪些內部topic,它們都有什麼特徵?各自的作用又是什麼?

__consumer_offsets:作用是保存 Kafka 消費者的位移信息
__transaction_state:用來存儲事務日誌消息

優先副本是什麼?它有什麼特殊的作用?

所謂的優先副本是指在AR集合列表中的第一個副本。
理想情況下,優先副本就是該分區的leader 副本,所以也可以稱之為 preferred leader。Kafka 要確保所有主題的優先副本在 Kafka 集群中均勻分佈,這樣就保證了所有分區的 leader 均衡分佈。以此來促進集群的負載均衡,這一行為也可以稱為“分區平衡”。

Kafka有哪幾處地方有分區分配的概念?簡述大致的過程及原理

  1. 生產者的分區分配是指為每條消息指定其所要發往的分區。可以編寫一個具體的類實現org.apache.kafka.clients.producer.Partitioner接口。
  2. 消費者中的分區分配是指為消費者指定其可以消費消息的分區。Kafka 提供了消費者客戶端參數 partition.assignment.strategy 來設置消費者與訂閱主題之間的分區分配策略。
  3. 分區副本的分配是指為集群制定創建主題時的分區副本分配方案,即在哪個 broker 中創建哪些分區的副本。kafka-topics.sh 腳本中提供了一個 replica-assignment 參數來手動指定分區副本的分配方案。

簡述Kafka的日誌目錄結構

Kafka 中的消息是以主題為基本單位進行歸類的,各個主題在邏輯上相互獨立。每個主題又可以分為一個或多個分區。不考慮多副本的情況,一個分區對應一個日誌(Log)。為了防止 Log 過大,Kafka 又引入了日誌分段(LogSegment)的概念,將 Log 切分為多個 LogSegment,相當於一個巨型文件被平均分配為多個相對較小的文件。

Log 和 LogSegment 也不是純粹物理意義上的概念,Log 在物理上只以文件夾的形式存儲,而每個 LogSegment 對應於磁盤上的一個日誌文件和兩個索引文件,以及可能的其他文件(比如以“.txnindex”為後綴的事務索引文件)

Kafka中有那些索引文件?

每個日誌分段文件對應了兩個索引文件,主要用來提高查找消息的效率。
偏移量索引文件用來建立消息偏移量(offset)到物理地址之間的映射關係,方便快速定位消息所在的物理文件位置
時間戳索引文件則根據指定的時間戳(timestamp)來查找對應的偏移量信息。

如果我指定了一個offset,Kafka怎麼查找到對應的消息?

Kafka是通過seek() 方法來指定消費的,在執行seek() 方法之前要去執行一次poll()方法,等到分配到分區之後會去對應的分區的指定位置開始消費,如果指定的位置發生了越界,那麼會根據auto.offset.reset 參數設置的情況進行消費。

如果我指定了一個timestamp,Kafka怎麼查找到對應的消息?

Kafka提供了一個 offsetsForTimes() 方法,通過 timestamp 來查詢與此對應的分區位置。offsetsForTimes() 方法的參數 timestampsToSearch 是一個 Map 類型,key 為待查詢的分區,而 value 為待查詢的時間戳,該方法會返回時間戳大於等於待查詢時間的第一條消息對應的位置和時間戳,對應於 OffsetAndTimestamp 中的 offset 和 timestamp 字段。

聊一聊你對Kafka的Log Retention的理解

日誌刪除(Log Retention):按照一定的保留策略直接刪除不符合條件的日誌分段。
我們可以通過 broker 端參數 log.cleanup.policy 來設置日誌清理策略,此參數的默認值為“delete”,即採用日誌刪除的清理策略。

  1. 基於時間
    日誌刪除任務會檢查當前日誌文件中是否有保留時間超過設定的閾值(retentionMs)來尋找可刪除的日誌分段文件集合(deletableSegments)retentionMs 可以通過 broker 端參數 log.retention.hours、log.retention.minutes 和 log.retention.ms 來配置,其中 log.retention.ms 的優先級最高,log.retention.minutes 次之,log.retention.hours 最低。默認情況下只配置了 log.retention.hours 參數,其值為168,故默認情況下日誌分段文件的保留時間為7天。
    刪除日誌分段時,首先會從 Log 對象中所維護日誌分段的跳躍表中移除待刪除的日誌分段,以保證沒有線程對這些日誌分段進行讀取操作。然後將日誌分段所對應的所有文件添加上“.deleted”的後綴(當然也包括對應的索引文件)。最後交由一個以“delete-file”命名的延遲任務來刪除這些以“.deleted”為後綴的文件,這個任務的延遲執行時間可以通過 file.delete.delay.ms 參數來調配,此參數的默認值為60000,即1分鐘。

  2. 基於日誌大小
    日誌刪除任務會檢查當前日誌的大小是否超過設定的閾值(retentionSize)來尋找可刪除的日誌分段的文件集合(deletableSegments)。
    retentionSize 可以通過 broker 端參數 log.retention.bytes 來配置,默認值為-1,表示無窮大。注意 log.retention.bytes 配置的是 Log 中所有日誌文件的總大小,而不是單個日誌分段(確切地說應該為 .log 日誌文件)的大小。單個日誌分段的大小由 broker 端參數 log.segment.bytes 來限制,默認值為1073741824,即 1GB。
    這個刪除操作和基於時間的保留策略的刪除操作相同。
  3. 基於日誌起始偏移量
    基於日誌起始偏移量的保留策略的判斷依據是某日誌分段的下一個日誌分段的起始偏移量 baseOffset 是否小於等於 logStartOffset,若是,則可以刪除此日誌分段。

如上圖所示,假設 logStartOffset 等於25,日誌分段1的起始偏移量為0,日誌分段2的起始偏移量為11,日誌分段3的起始偏移量為23,通過如下動作收集可刪除的日誌分段的文件集合 deletableSegments:

從頭開始遍歷每個日誌分段,日誌分段1的下一個日誌分段的起始偏移量為11,小於 logStartOffset 的大小,將日誌分段1加入 deletableSegments。
日誌分段2的下一個日誌偏移量的起始偏移量為23,也小於 logStartOffset 的大小,將日誌分段2加入 deletableSegments。
日誌分段3的下一個日誌偏移量在 logStartOffset 的右側,故從日誌分段3開始的所有日誌分段都不會加入 deletableSegments。
收集完可刪除的日誌分段的文件集合之後的刪除操作同基於日誌大小的保留策略和基於時間的保留策略相同

聊一聊你對Kafka的Log Compaction的理解

日誌壓縮(Log Compaction):針對每個消息的 key 進行整合,對於有相同 key 的不同 value 值,只保留最後一個版本。
如果要採用日誌壓縮的清理策略,就需要將 log.cleanup.policy 設置為“compact”,並且還需要將 log.cleaner.enable (默認值為 true)設定為 true。

如下圖所示,Log Compaction 對於有相同 key 的不同 value 值,只保留最後一個版本。如果應用只關心 key 對應的最新 value 值,則可以開啟 Kafka 的日誌清理功能,Kafka 會定期將相同 key 的消息進行合併,只保留最新的 value 值。

聊一聊你對Kafka底層存儲的理解

頁緩存

頁緩存是操作系統實現的一種主要的磁盤緩存,以此用來減少對磁盤 I/O 的操作。具體來說,就是把磁盤中的數據緩存到內存中,把對磁盤的訪問變為對內存的訪問。

當一個進程準備讀取磁盤上的文件內容時,操作系統會先查看待讀取的數據所在的頁(page)是否在頁緩存(pagecache)中,如果存在(命中)則直接返回數據,從而避免了對物理磁盤的 I/O 操作;如果沒有命中,則操作系統會向磁盤發起讀取請求並將讀取的數據頁存入頁緩存,之後再將數據返回給進程。

同樣,如果一個進程需要將數據寫入磁盤,那麼操作系統也會檢測數據對應的頁是否在頁緩存中,如果不存在,則會先在頁緩存中添加相應的頁,最後將數據寫入對應的頁。被修改過後的頁也就變成了臟頁,操作系統會在合適的時間把臟頁中的數據寫入磁盤,以保持數據的一致性。

用過 Java 的人一般都知道兩點事實:對象的內存開銷非常大,通常會是真實數據大小的幾倍甚至更多,空間使用率低下;Java 的垃圾回收會隨着堆內數據的增多而變得越來越慢。基於這些因素,使用文件系統並依賴於頁緩存的做法明顯要優於維護一個進程內緩存或其他結構,至少我們可以省去了一份進程內部的緩存消耗,同時還可以通過結構緊湊的字節碼來替代使用對象的方式以節省更多的空間。

此外,即使 Kafka 服務重啟,頁緩存還是會保持有效,然而進程內的緩存卻需要重建。這樣也極大地簡化了代碼邏輯,因為維護頁緩存和文件之間的一致性交由操作系統來負責,這樣會比進程內維護更加安全有效。

零拷貝

除了消息順序追加、頁緩存等技術,Kafka 還使用零拷貝(Zero-Copy)技術來進一步提升性能。所謂的零拷貝是指將數據直接從磁盤文件複製到網卡設備中,而不需要經由應用程序之手。零拷貝大大提高了應用程序的性能,減少了內核和用戶模式之間的上下文切換。對 Linux 操作系統而言,零拷貝技術依賴於底層的 sendfile() 方法實現。對應於 Java 語言,FileChannal.transferTo() 方法的底層實現就是 sendfile() 方法。

聊一聊Kafka的延時操作的原理

Kafka 中有多種延時操作,比如延時生產,還有延時拉取(DelayedFetch)、延時數據刪除(DelayedDeleteRecords)等。
延時操作創建之後會被加入延時操作管理器(DelayedOperationPurgatory)來做專門的處理。延時操作有可能會超時,每個延時操作管理器都會配備一個定時器(SystemTimer)來做超時管理,定時器的底層就是採用時間輪(TimingWheel)實現的。

聊一聊Kafka控制器的作用

在 Kafka 集群中會有一個或多個 broker,其中有一個 broker 會被選舉為控制器(Kafka Controller),它負責管理整個集群中所有分區和副本的狀態。當某個分區的 leader 副本出現故障時,由控制器負責為該分區選舉新的 leader 副本。當檢測到某個分區的 ISR 集合發生變化時,由控制器負責通知所有broker更新其元數據信息。當使用 kafka-topics.sh 腳本為某個 topic 增加分區數量時,同樣還是由控制器負責分區的重新分配。

Kafka的舊版Scala的消費者客戶端的設計有什麼缺陷?

如上圖,舊版消費者客戶端每個消費組( )在 ZooKeeper 中都維護了一個 /consumers/ /ids 路徑,在此路徑下使用臨時節點記錄隸屬於此消費組的消費者的唯一標識(consumerIdString),/consumers/ /owner 路徑下記錄了分區和消費者的對應關係,/consumers/ /offsets 路徑下記錄了此消費組在分區中對應的消費位移。

每個消費者在啟動時都會在 /consumers/ /ids 和 /brokers/ids 路徑上註冊一個監聽器。當 /consumers/ /ids 路徑下的子節點發生變化時,表示消費組中的消費者發生了變化;當 /brokers/ids 路徑下的子節點發生變化時,表示 broker 出現了增減。這樣通過 ZooKeeper 所提供的 Watcher,每個消費者就可以監聽消費組和 Kafka 集群的狀態了。

這種方式下每個消費者對 ZooKeeper 的相關路徑分別進行監聽,當觸發再均衡操作時,一個消費組下的所有消費者會同時進行再均衡操作,而消費者之間並不知道彼此操作的結果,這樣可能導致 Kafka 工作在一個不正確的狀態。與此同時,這種嚴重依賴於 ZooKeeper 集群的做法還有兩個比較嚴重的問題。

  1. 羊群效應(Herd Effect):所謂的羊群效應是指ZooKeeper 中一個被監聽的節點變化,大量的 Watcher 通知被發送到客戶端,導致在通知期間的其他操作延遲,也有可能發生類似死鎖的情況。
  2. 腦裂問題(Split Brain):消費者進行再均衡操作時每個消費者都與 ZooKeeper 進行通信以判斷消費者或broker變化的情況,由於 ZooKeeper 本身的特性,可能導致在同一時刻各個消費者獲取的狀態不一致,這樣會導致異常問題發生。

消費再均衡的原理是什麼?(提示:消費者協調器和消費組協調器)

就目前而言,一共有如下幾種情形會觸發再均衡的操作:

  • 有新的消費者加入消費組。
  • 有消費者宕機下線。消費者並不一定需要真正下線,例如遇到長時間的GC、網絡延遲導致消費者長時間未向 GroupCoordinator 發送心跳等情況時,GroupCoordinator 會認為消費者已經下線。
  • 有消費者主動退出消費組(發送 LeaveGroupRequest 請求)。比如客戶端調用了 unsubscrible() 方法取消對某些主題的訂閱。
  • 消費組所對應的 GroupCoorinator 節點發生了變更。
  • 消費組內所訂閱的任一主題或者主題的分區數量發生變化。

GroupCoordinator 是 Kafka 服務端中用於管理消費組的組件。而消費者客戶端中的 ConsumerCoordinator 組件負責與 GroupCoordinator 進行交互。

第一階段(FIND_COORDINATOR)

消費者需要確定它所屬的消費組對應的 GroupCoordinator 所在的 broker,並創建與該 broker 相互通信的網絡連接。如果消費者已經保存了與消費組對應的 GroupCoordinator 節點的信息,並且與它之間的網絡連接是正常的,那麼就可以進入第二階段。否則,就需要向集群中的某個節點發送 FindCoordinatorRequest 請求來查找對應的 GroupCoordinator,這裏的“某個節點”並非是集群中的任意節點,而是負載最小的節點。

第二階段(JOIN_GROUP)

在成功找到消費組所對應的 GroupCoordinator 之後就進入加入消費組的階段,在此階段的消費者會向 GroupCoordinator 發送 JoinGroupRequest 請求,並處理響應。

選舉消費組的leader
如果消費組內還沒有 leader,那麼第一個加入消費組的消費者即為消費組的 leader。如果某一時刻 leader 消費者由於某些原因退出了消費組,那麼會重新選舉一個新的 leader

選舉分區分配策略

  1. 收集各個消費者支持的所有分配策略,組成候選集 candidates。
  2. 每個消費者從候選集 candidates 中找出第一個自身支持的策略,為這個策略投上一票。
  3. 計算候選集中各個策略的選票數,選票數最多的策略即為當前消費組的分配策略。

第三階段(SYNC_GROUP)

leader 消費者根據在第二階段中選舉出來的分區分配策略來實施具體的分區分配,在此之後需要將分配的方案同步給各個消費者,通過 GroupCoordinator 這個“中間人”來負責轉發同步分配方案的。

第四階段(HEARTBEAT)

進入這個階段之後,消費組中的所有消費者就會處於正常工作狀態。在正式消費之前,消費者還需要確定拉取消息的起始位置。假設之前已經將最後的消費位移提交到了 GroupCoordinator,並且 GroupCoordinator 將其保存到了 Kafka 內部的 __consumer_offsets 主題中,此時消費者可以通過 OffsetFetchRequest 請求獲取上次提交的消費位移並從此處繼續消費。

消費者通過向 GroupCoordinator 發送心跳來維持它們與消費組的從屬關係,以及它們對分區的所有權關係。只要消費者以正常的時間間隔發送心跳,就被認為是活躍的,說明它還在讀取分區中的消息。心跳線程是一個獨立的線程,可以在輪詢消息的空檔發送心跳。如果消費者停止發送心跳的時間足夠長,則整個會話就被判定為過期,GroupCoordinator 也會認為這個消費者已經死亡,就會觸發一次再均衡行為。

Kafka中的冪等是怎麼實現的?

為了實現生產者的冪等性,Kafka 為此引入了 producer id(以下簡稱 PID)和序列號(sequence number)這兩個概念。

每個新的生產者實例在初始化的時候都會被分配一個 PID,這個 PID 對用戶而言是完全透明的。對於每個 PID,消息發送到的每一個分區都有對應的序列號,這些序列號從0開始單調遞增。生產者每發送一條消息就會將 <PID,分區> 對應的序列號的值加1。

broker 端會在內存中為每一對 <PID,分區> 維護一個序列號。對於收到的每一條消息,只有當它的序列號的值(SN_new)比 broker 端中維護的對應的序列號的值(SN_old)大1(即 SN_new = SN_old + 1)時,broker 才會接收它。如果 SN_new< SN_old + 1,那麼說明消息被重複寫入,broker 可以直接將其丟棄。如果 SN_new> SN_old + 1,那麼說明中間有數據尚未寫入,出現了亂序,暗示可能有消息丟失,對應的生產者會拋出 OutOfOrderSequenceException,這個異常是一個嚴重的異常,後續的諸如 send()、beginTransaction()、commitTransaction() 等方法的調用都會拋出 IllegalStateException 的異常。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※帶您來看台北網站建置台北網頁設計,各種案例分享

【python測試開發棧】帶你徹底搞明白python3編碼原理

在之前的文章中,我們介紹過編碼格式的發展史:[文章傳送門-todo]。今天我們通過幾個例子,來徹底搞清楚python3中的編碼格式原理,這樣你之後寫python腳本時碰到編碼問題,才能有章可循。

我們先搞清楚幾個概念:

  • 系統默認編碼:指python解釋器默認的編碼格式,在python文件頭部沒有聲明其他編碼格式時,python3默認的編碼格式是utf-8。
  • 本地默認編碼:操作系統默認的編碼,常見的Windows的默認編碼是gbk,Linux的默認編碼是UTF-8。
  • python文件頭部聲明編碼格式:修改的是文件的默認編碼格式,只是會影響python解釋器讀取python文件時的編碼格式,並不會改變系統默認編碼和本地默認編碼。

通過python自帶的庫,可以查看系統默認編碼和本地默認編碼

Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import sys
>>> sys.getdefaultencoding()
'utf-8'
>>> import locale
>>> locale.getdefaultlocale()
('zh_CN', 'cp936')
>>>

注意,因為我在windows系統的電腦上 進行測試,所以系統默認編碼返回“cp936”, 這是代碼頁(是字符編碼集的別名),而936對應的就是gbk。如果你在linux或者mac上執行上面的代碼,應該會返回utf-8編碼。

其實總結來看,容易出現亂碼的場景,基本都與讀寫程序有關,比如:讀取/寫入某個文件,或者從網絡流中讀取數據等,因為這個過程中涉及到了編碼解碼的過程,只要編碼和解碼的編碼格式對應不上,就容易出現亂碼。下面我們舉兩個具體的例子,來驗證下python的編碼原理,幫助你理解這個過程。注意:下面的例子都是在pycharm中寫的。

01默認的編碼格式

我們新建一個encode_demo.py的文件,其文件默認的編碼格式是UTF-8(可以從pycharm右下角看到編碼格式),代碼如下:

"""
    @author: asus
    @time: 2019/11/21
    @function: 驗證編碼格式
"""
import sys, locale


def write_str_default_encode():
    s = "我是一個str"
    print(s)
    print(type(s))
    print(sys.getdefaultencoding())
    print(locale.getdefaultlocale())

    with open("utf_file", "w", encoding="utf-8") as f:
        f.write(s)
    with open("gbk_file", "w", encoding="gbk") as f:
        f.write(s)
    with open("jis_file", "w", encoding="shift-jis") as f:
        f.write(s)


if __name__ == '__main__':
    write_str_default_encode()

我們先來猜測下結果,因為我們沒有聲明編碼格式,所以python解釋器默認用UTF-8去解碼文件,因為文件默認編碼格式就是UTF-8,所以字符串s可以正常打印。同時以UTF-8編碼格式寫文件不會出現亂碼,而以gbk和shift-jis(日文編碼)寫文件會出現亂碼(這裏說明一點,我是用pycharm直接打開生成的文件查看的,編輯器默認編碼是UTF-8,如果在windows上用記事本打開則其默認編碼跟隨系統是GBK,gbk_file和utf_file均不會出現亂碼,只有jis_file是亂碼),我們運行看下結果:

# 運行結果
我是一個str
<class 'str'>
utf-8
('zh_CN', 'cp936')

# 寫文件utf_file、gbk_file、jis_file文件內容分別是:
我是一個str
����һ��str
�䐥�꘢str

和我們猜測的結果一致,下面我們做個改變,在文件頭部聲明個編碼格式,再來看看效果。

02 python頭文件聲明編碼格式

因為上面文件encode_demo.py的格式是UTF-8,那麼我們就將其變為gbk編碼。同樣的我們先來推測下結果,在pycharm中,在python文件頭部聲明編碼為gbk后(頭部加上 # coding=gbk ),文件的編碼格式變成gbk,同時python解釋器會用gbk去解碼encode_demo.py文件,所以運行結果應該和用UTF-8編碼時一樣。運行結果如下:

# 運行結果
我是一個str
<class 'str'>
utf-8
('zh_CN', 'cp936')

# 寫文件utf_file、gbk_file、jis_file文件內容分別是:
我是一個str
����һ��str
�䐥�꘢str

結果確實是一樣的,證明我們推論是正確的。接下來我們再做個嘗試,假如我們將(# coding=gbk)去掉(需要注意,在pycharm中將 # coding=gbk去掉,並不會改變文件的編碼格式,也就是說encode_demo.py還是gbk編碼),我們再運行一次看結果:

  File "D:/codespace/python/pythonObject/pythonSample/basic/encodeDemo/encode_demo.py", line 4
SyntaxError: Non-UTF-8 code starting with '\xd1' in file D:/codespace/python/pythonObject/pythonSample/basic/encodeDemo/encode_demo.py on line 5, but no encoding declared; see http://python.org/dev/peps/pep-0263/ for details

運行直接報錯了,我們加個斷點,看看具體的異常信息:

看錯誤提示是UnicodeDecodeError,python解釋器在對encode_demo.py文件解碼時,使用默認的UTF-8編碼,但是文件本身是gbk編碼,所以當碰到有中文沒辦法識別時,就拋出DecodeError。

03 敲黑板,划重點

python3中的str和bytes

python3的重要特性之一就是對字符串和二進制流做了嚴格的區分,我們聲明的字符串都是str類型,不過Str和bytes是可以相互轉換的:

def str_transfor_bytes():
    s = '我是一個測試Str'
    print(type(s))
    # str 轉bytes
    b = s.encode()
    print(b)
    print(type(b))
    # bytes轉str
    c = b.decode('utf-8')
    print(c)
    print(type(c))


if __name__ == '__main__':
    str_transfor_bytes()

需要注意一點:在調用encode()和decode()方法時,如果不傳參數,則會使用python解釋器默認的編碼格式UTF-8(如果不在python頭文件聲明編碼格式)。但是如果傳參的話,encode和decode使用的編碼格式要能對應上。

python3默認編碼是UTF-8?還是Unicode?

經常在很多文章里看到,python3的默認編碼格式是Unicode,但是我在本文中卻一直在說python3的默認編碼格式是UTF-8,那麼哪種說法是正確的呢?其實兩種說法都對,主要得搞清楚Unicode和UTF-8的區別(之前文章有提到):

  • Unicode是一個字符集,說白了就是把各種編碼的映射關係全都整合起來,不過它是不可變長的,全部都以兩個字節或四個字節來表示,佔用的內存空間比較大。
  • UTF-8是Unicode的一種實現方式,主要對 Unicode 碼的數據進行轉換,方便存儲和網絡傳輸 。它是可變長編碼,比如對於英文字母,它使用一個字節就可以表示。

在python3內存中使用的字符串全都是Unicode碼,當python解釋器解析python文件時,默認使用UTF-8編碼。

open()方法默認使用本地編碼

在上面的例子中,我們往磁盤寫入文件時,都指定了編碼格式。如果不指定編碼格式,那麼默認將使用操作系統本地默認的編碼格式,比如:Linux默認是UTF-8,windows默認是GBK。其實這也好理解,因為和磁盤交互,肯定要考慮操作系統的編碼格式。這有區別於encode()和decode()使用的是python解釋器的默認編碼格式,千萬別搞混淆了。

總結

不知道你看完上面的例子后,是否已經徹底理解了python3的編碼原理。不過所有的編碼問題,都逃不過“編碼”和“解碼”兩個過程,當你碰到編碼問題時,先確定源文件使用的編碼,再確定目標文件需要的編碼格式,只要能匹配,一般就可以解決編碼的問題。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

小白學 Python 爬蟲(2):前置準備(一)基本類庫的安裝

人生苦短,我用 Python

前文傳送門:

本篇內容較長,各位同學可以先收藏后再看~~

在開始講爬蟲之前,還是先把環境搞搞好,工欲善其事必先利其器嘛~~~

本篇文章主要介紹 Python 爬蟲所使用到的請求庫和解析庫,請求庫用來請求目標內容,解析庫用來解析請求回來的內容。

開發環境

首先介紹小編本地的開發環境:

  • Python3.7.4
  • win10

差不多就這些,最基礎的環境,其他環境需要我們一個一個安裝,現在開始。

請求庫

雖然 Python 為我們內置了 HTTP 請求庫 urllib ,使用姿勢並不是很優雅,但是很多第三方的提供的 HTTP 庫確實更加的簡潔優雅,我們下面開始。

Requests

Requests 類庫是一個第三方提供的用於發送 HTTP 同步請求的類庫,相比較 Python 自帶的 urllib 類庫更加的方便和簡潔。

Python 為我們提供了包管理工具 pip ,使用 pip 安裝將會非常的方便,安裝命令如下:

pip install requests

驗證:

C:\Users\inwsy>python
Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import requests

首先在 CMD 命令行中輸入 python ,進入 python 的命令行模式,然後輸入 import requests 如果沒有任何錯誤提示,說明我們已經成功安裝 Requests 類庫。

Selenium

Selenium 現在更多的是用來做自動化測試工具,相關的書籍也不少,同時,我們也可以使用它來做爬蟲工具,畢竟是自動化測試么,利用它我們可以讓瀏覽器執行我們想要的動作,比如點擊某個按鈕、滾動滑輪之類的操作,這對我們模擬真實用戶操作是非常方便的。

安裝命令如下:

pip install selenium

驗證:

C:\Users\inwsy>python
Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import selenium

這樣沒報錯我們就安裝完成,但是你以為這樣就算好了么?圖樣圖森破啊。

ChromeDriver

我們還需要瀏覽器的支持來配合 selenium 的工作,開發人員嘛,常用的瀏覽器莫非那麼幾種:Chrome、Firefox,那位說 IE 的同學,你給我站起來,小心我跳起來打你膝蓋,還有說 360 瀏覽器的,你們可讓我省省心吧。

接下來,安裝 Chrome 瀏覽器就不用講了吧。。。。

再接下來,我們開始安裝 ChromeDriver ,安裝了 ChromeDriver 后,我們才能通過剛才安裝的 selenium 來驅動 Chrome 來完成各種騷操作。

首先,我們需要查看自己的 Chrome 瀏覽器的版本,在 Chrome 瀏覽器右上角的三個點鐘,點擊 幫助 -> 關於,如下圖:

將這個版本找個小本本記下來,小編這裏的版本為: 版本 78.0.3904.97(正式版本) (64 位)

接下來我們需要去 ChromeDriver 的官網查看當前 Chrome 對應的驅動。

官網地址:

因某些原因,訪問時需某些手段,訪問不了的就看小編為大家準備的版本對應表格吧。。。

ChromeDriver Version Chrome Version
78.0.3904.11 78
77.0.3865.40 77
77.0.3865.10 77
76.0.3809.126 76
76.0.3809.68 76
76.0.3809.25 76
76.0.3809.12 76
75.0.3770.90 75
75.0.3770.8 75
74.0.3729.6 74
73.0.3683.68 73
72.0.3626.69 72
2.46 71-73
2.45 70-72
2.44 69-71
2.43 69-71
2.42 68-70
2.41 67-69
2.40 66-68
2.39 66-68
2.38 65-67
2.37 64-66
2.36 63-65
2.35 62-64

順便小編找到了國內對應的下載的鏡像站,由淘寶提供,如下:

雖然和小編本地的小版本對不上,但是看樣子只要大版本符合應該沒啥問題,so,去鏡像站下載對應的版本即可,小編這裏下載的版本是:78.0.3904.70 ,ChromeDriver 78版本的最後一個小版本。

下載完成后,將可執行文件 chromedriver.exe 移動至 Python 安裝目錄的 Scripts 目錄下。如果使用默認安裝未修改過安裝目錄的話目錄是:%homepath%\AppData\Local\Programs\Python\Python37\Scripts ,如果有過修改,那就自力更生吧。。。

chromedriver.exe 添加后結果如下圖:

驗證:

還是在 CMD 命令行中,輸入以下內容:

C:\Users\inwsy>python
Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from selenium import webdriver
>>> browser = webdriver.Chrome()

如果打開一個空白的 Chrome 頁面說明安裝成功。

GeckoDriver

上面我們通過安裝 Chrome 的驅動完成了 Selenium 與 Chrome 的對接,想要完成 Selenium 與 FireFox 的對接則需要安裝另一個驅動 GeckoDriver 。

FireFox 的安裝小編這裏就不介紹了,大家最好去官網下載安裝,路徑如下:

FireFox 官網地址:

GeckoDriver 的下載需要去 Github (全球最大的同性交友網站),下載路徑小編已經找好了,可以選擇最新的 releases 版本進行下載。

下載地址:

選擇對應自己的環境,小編這裏選擇 win-64 ,版本為 v0.26.0 進行下載。

具體配置方式和上面一樣,將可執行的 .exe 文件放入 %homepath%\AppData\Local\Programs\Python\Python37\Scripts 目錄下即可。

驗證:

還是在 CMD 命令行中,輸入以下內容:

C:\Users\inwsy>python
Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from selenium import webdriver
>>> browser = webdriver.Firefox()

應該是可以正常打開一個空白的 FireFox 頁面的,結果如下:

注意: GeckoDriver 指出一點,當前的版本在 win 下使用有已知 bug ,需要安裝微軟的一個插件才能解決,原文如下:

You must still have the installed on your system for the binary to run. This is a known bug which we weren’t able fix for this release.

插件下載地址:

請各位同學選擇自己對應的系統版本進行下載安裝。

Aiohttp

上面我們介紹了同步的 Http 請求庫 Requests ,而 Aiohttp 則是一個提供異步 Http 請求的類庫。

那麼,問題來了,什麼是同步請求?什麼是異步請求呢?

  • 同步:阻塞式,簡單理解就是當發出一個請求以後,程序會一直等待這個請求響應,直到響應以後,才繼續做下一步。
  • 異步:非阻塞式,還是上面的例子,當發出一個請求以後,程序並不會阻塞在這裏,等待請求響應,而是可以去做其他事情。

從資源消耗和效率上來說,同步請求是肯定比不過異步請求的,這也是為什麼異步請求會比同步請求擁有更大的吞吐量。在抓取數據時使用異步請求,可以大大提升抓取的效率。

如果還想了解跟多有關 aiohttp 的內容,可以訪問官方文檔: 。

aiohttp 安裝如下:

pip install aiohttp

aiohttp 還推薦我們安裝另外兩個庫,一個是字符編碼檢測庫 cchardet ,另一個是加速DNS的解析庫 aiodns 。

安裝 cchardet 庫:

pip install cchardet

安裝 aiodns 庫:

pip install aiodns

aiohttp 十分貼心的為我們準備了整合的安裝命令,無需一個一個鍵入命令,如下:

pip install aiohttp[speedups]

驗證:

C:\Users\inwsy>python
Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import aiohttp

沒報錯就安裝成功。

解析庫

lxml

lxml 是 Python 的一個解析庫,支持 HTML 和 XML 的解析,支持 XPath 的解析方式,而且解析效率非常高。

什麼是 XPath ?

XPath即為XML路徑語言(XML Path Language),它是一種用來確定XML文檔中某部分位置的語言。
XPath基於XML的樹狀結構,提供在數據結構樹中找尋節點的能力。起初XPath的提出的初衷是將其作為一個通用的、介於XPointer與XSL間的語法模型。

以上內容來源《百度百科》

好吧,小編說人話,就是可以從 XML 文檔或者 HTML 文檔中快速的定位到所需要的位置的路徑語言。

還沒看懂?emmmmmmmmmmm,我們可以使用 XPath 快速的取出 XML 或者 HTML 文檔中想要的值。用法的話我們放到後面再聊。

安裝 lxml 庫:

pip install lxml

驗證:

C:\Users\inwsy>python
Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import lxml

沒報錯就安裝成功。

Beautiful Soup

Beautiful Soup 同樣也是一個 Python 的 HTML 或 XML 的解析庫 。它擁有強大的解析能力,我們可以使用它更方便的從 HTML 文檔中提取數據。

首先,放一下 Beautiful Soup 的官方網址,有各種問題都可以在官網查看文檔,各位同學養成一個好習慣,有問題找官方文檔,雖然是英文的,使用 Chrome 自帶的翻譯功能還是勉強能看的。

官方網站:

安裝方式依然使用 pip 進行安裝:

pip install beautifulsoup4

Beautiful Soup 的 HTML 和 XML 解析器是依賴於 lxml 庫的,所以在此之前請確保已經成功安裝好了 lxml 庫 。

驗證:

C:\Users\inwsy>python
Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from bs4 import BeautifulSoup

沒報錯就安裝成功。

pyquery

pyquery 同樣也是一個網頁解析庫,只不過和前面兩個有區別的是它提供了類似 jQuery 的語法來解析 HTML 文檔,前端有經驗的同學應該會非常喜歡這款解析庫。

首先還是放一下 pyquery 的官方文檔地址。

官方文檔:

安裝:

pip install pyquery

驗證:

C:\Users\inwsy>python
Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyquery

沒報錯就安裝成功。

本篇的內容就先到這裏結束了。請各位同學在自己的電腦上將上面介紹的內容都安裝一遍,以便後續學習使用。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※帶您來看台北網站建置台北網頁設計,各種案例分享

fastjason常用方法

什麼是fastjson?

Fastjson是一個Java語言編寫的高性能功能完善的JSON庫。它採用一種“假定有序快速匹配”的算法,把JSON Parse的性能提升到極致,是目前Java語言中最快的JSON庫。Fastjson接口簡單易用,已經被廣泛使用在緩存序列化、協議交互、Web輸出、Android客戶端等多種應用場景。

主要特點:

  • 快速FAST (比其它任何基於Java的解析器和生成器更快,包括jackson)
  • 強大(支持普通JDK類包括任意Java Bean Class、Collection、Map、Date或enum)
  • 零依賴(沒有依賴其它任何類庫除了JDK)

背景

最近關於fastjson的消息,引起了很多人的關注!

fastjson爆出重大漏洞,攻擊者可使整個業務癱瘓

漏洞描述

常用JSON組件FastJson存在遠程代碼執行漏洞,攻擊者可通過精心構建的json報文對目標服務器執行任意命令,從而獲得服務器權限。此次爆發的漏洞為以往漏洞中autoType的繞過。

影響範圍

FastJson < 1.2.48

很多開發者才猛然發現,fastjson已經深入到我們開發工作的方方面面。那麼除了趕快升級你的json外,我們來挖挖fastjson最常用的用法。

fastjson常用方式

1.maven依賴(記得升級到1.2.48以上版本哦)

        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
         <groupId>com.alibaba</groupId>
         <artifactId>fastjson</artifactId>
         <version>1.2.62</version>
        </dependency>    

2.FastJson對於json格式字符串的解析主要用到了一下三個類:

(1)JSON:fastJson的解析器,用於JSON格式字符串與JSON對象及javaBean之間的轉換。

(2)JSONObject:fastJson提供的json對象。

(3)JSONArray:fastJson提供json數組對象。

3.常用方式

3.1 string和java對象

 

實例1:對象轉json字符串

        Map<String,String> map=new HashMap<String,String>();
        map.put("code","0");
        map.put("message","ok");
        String json=JSON.toJSONString(map);
        System.out.println(json);

輸出結果為:

{"code":"0","message":"ok"}

實例2:字符串轉對象

        Map<String,String> map=new HashMap<String,String>();
        map.put("code","0");
        map.put("message","ok");
        String json=JSON.toJSONString(map);
        System.out.println(json);
        
        Map obj=(Map)JSON.parse(json);
        System.out.println("code="+obj.get("code")+",message="+obj.get("message"));

輸出結果

{"code":"0","message":"ok"}
code=0,message=ok

3.2 工具類JSONObject

    public static void main(String[] args) {
        Map<String,String> map=new HashMap<String,String>();
        map.put("code","0");
        map.put("message","ok");
        String json=JSON.toJSONString(map);
        System.out.println(json);
        
        Map obj=(Map)JSON.parse(json);
        System.out.println("code="+obj.get("code")+",message="+obj.get("message"));        
        
        String code=JSON.parseObject(json).getString("code");
        String message=JSON.parseObject(json).getString("message");
        System.out.println("code="+code+",message="+message);
    }

輸出結果

{"code":"0","message":"ok"}
code=0,message=ok
code=0,message=ok

3.3 數組對象

List<user> list=new ArrayList<user>(JSONArray.parseArray(jsonString,user.class)); 

Fastjson 與各種JSON庫的性能比較:

 

json庫 序列化性能 反序列化性能 jar大小
fastjson 1201 1216 fastjson-1.1.26.jar(356k)
fastjson-1.1.25-android.jar(226k)
jackson 1408 1915 jackson-annotations-2.1.1.jar(34k)
jackson-core-2.1.1.jar(206k)
jackson-databind-2.1.1.jar(922k)
總共1162k
gson 7421 5065 gson-2.2.2.jar(189k)
json-lib 27555 87292 json-lib-2.4-jdk15.jar(159k)


本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

Spring Boot2 系列教程(二十五)Spring Boot 整合 Jpa 多數據源

本文是 Spring Boot 整合數據持久化方案的最後一篇,主要和大夥來聊聊 Spring Boot 整合 Jpa 多數據源問題。在 Spring Boot 整合JbdcTemplate 多數據源、Spring Boot 整合 MyBatis 多數據源以及 Spring Boot 整合 Jpa 多數據源這三個知識點中,整合 Jpa 多數據源算是最複雜的一種,也是很多人在配置時最容易出錯的一種。本文大夥就跟着松哥的教程,一步一步整合 Jpa 多數據源。

工程創建

首先是創建一個 Spring Boot 工程,創建時添加基本的 Web、Jpa 以及 MySQL 依賴,如下:

創建完成后,添加 Druid 依賴,這裏和前文的要求一樣,要使用專為 Spring Boot 打造的 Druid,大夥可能發現了,如果整合多數據源一定要使用這個依賴,因為這個依賴中才有 DruidDataSourceBuilder,最後還要記得鎖定數據庫依賴的版本,因為可能大部分人用的還是 5.x 的 MySQL 而不是 8.x。完整依賴如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>1.1.10</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.28</version>
    <scope>runtime</scope>
</dependency>

如此之後,工程就創建成功了。

基本配置

在基本配置中,我們首先來配置多數據源基本信息以及 DataSource,首先在 application.properties 中添加如下配置信息:

#  數據源一
spring.datasource.one.username=root
spring.datasource.one.password=root
spring.datasource.one.url=jdbc:mysql:///test01?useUnicode=true&characterEncoding=UTF-8
spring.datasource.one.type=com.alibaba.druid.pool.DruidDataSource

#  數據源二
spring.datasource.two.username=root
spring.datasource.two.password=root
spring.datasource.two.url=jdbc:mysql:///test02?useUnicode=true&characterEncoding=UTF-8
spring.datasource.two.type=com.alibaba.druid.pool.DruidDataSource

# Jpa配置
spring.jpa.properties.database=mysql
spring.jpa.properties.show-sql=true
spring.jpa.properties.database-platform=mysql
spring.jpa.properties.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL57Dialect

這裏 Jpa 的配置和上文相比 key 中多了 properties,多數據源的配置和前文一致,然後接下來配置兩個 DataSource,如下:

@Configuration
public class DataSourceConfig {
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.one")
    @Primary
    DataSource dsOne() {
        return DruidDataSourceBuilder.create().build();
    }
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.two")
    DataSource dsTwo() {
        return DruidDataSourceBuilder.create().build();
    }
}

這裏的配置和前文的多數據源配置基本一致,但是注意多了一個在 Spring 中使用較少的註解 @Primary,這個註解一定不能少,否則在項目啟動時會出錯,@Primary 表示當某一個類存在多個實例時,優先使用哪個實例。

好了,這樣,DataSource 就有了。

多數據源配置

接下來配置 Jpa 的基本信息,這裏兩個數據源,我分別在兩個類中來配置,先來看第一個配置:

@Configuration
@EnableJpaRepositories(basePackages = "org.javaboy.jpa.dao",entityManagerFactoryRef = "localContainerEntityManagerFactoryBeanOne",transactionManagerRef = "platformTransactionManagerOne")
public class JpaConfigOne {
    @Autowired
    @Qualifier(value = "dsOne")
    DataSource dsOne;
    @Autowired
    JpaProperties jpaProperties;
    @Bean
    @Primary
    LocalContainerEntityManagerFactoryBean localContainerEntityManagerFactoryBeanOne(EntityManagerFactoryBuilder builder) {
        return builder.dataSource(dsOne)
                .packages("org.javaboy.jpa.model")
                .properties(jpaProperties.getProperties())
                .persistenceUnit("pu1")
                .build();
    }
    @Bean
    PlatformTransactionManager platformTransactionManagerOne(EntityManagerFactoryBuilder builder) {
        LocalContainerEntityManagerFactoryBean factoryBeanOne = localContainerEntityManagerFactoryBeanOne(builder);
        return new JpaTransactionManager(factoryBeanOne.getObject());
    }
}

首先這裏注入 dsOne,再注入 JpaProperties,JpaProperties 是系統提供的一個實例,裡邊的數據就是我們在 application.properties 中配置的 jpa 相關的配置。然後我們提供兩個 Bean,分別是 LocalContainerEntityManagerFactoryBean 和 PlatformTransactionManager 事務管理器,不同於 MyBatis 和 JdbcTemplate,在 Jpa 中,事務一定要配置。在提供 LocalContainerEntityManagerFactoryBean 的時候,需要指定 packages,這裏的 packages 指定的包就是這個數據源對應的實體類所在的位置,另外在這裏配置類上通過 @EnableJpaRepositories 註解指定 dao 所在的位置,以及 LocalContainerEntityManagerFactoryBean 和 PlatformTransactionManager 分別對應的引用的名字。

好了,這樣第一個就配置好了,第二個基本和這個類似,主要有幾個不同點:

  • dao 的位置不同
  • persistenceUnit 不同
  • 相關 bean 的名稱不同

注意實體類可以共用。

代碼如下:

@Configuration
@EnableJpaRepositories(basePackages = "org.javaboy.jpa.dao2",entityManagerFactoryRef = "localContainerEntityManagerFactoryBeanTwo",transactionManagerRef = "platformTransactionManagerTwo")
public class JpaConfigTwo {
    @Autowired
    @Qualifier(value = "dsTwo")
    DataSource dsTwo;
    @Autowired
    JpaProperties jpaProperties;
    @Bean
    LocalContainerEntityManagerFactoryBean localContainerEntityManagerFactoryBeanTwo(EntityManagerFactoryBuilder builder) {
        return builder.dataSource(dsTwo)
                .packages("org.javaboy.jpa.model")
                .properties(jpaProperties.getProperties())
                .persistenceUnit("pu2")
                .build();
    }
    @Bean
    PlatformTransactionManager platformTransactionManagerTwo(EntityManagerFactoryBuilder builder) {
        LocalContainerEntityManagerFactoryBean factoryBeanTwo = localContainerEntityManagerFactoryBeanTwo(builder);
        return new JpaTransactionManager(factoryBeanTwo.getObject());
    }
}

接下來,在對應位置分別提供相關的實體類和 dao 即可,數據源一的 dao 如下:

package org.javaboy.jpa.dao;
public interface UserDao extends JpaRepository<User,Integer> {
    List<User> getUserByAddressEqualsAndIdLessThanEqual(String address, Integer id);
    @Query(value = "select * from t_user where id=(select max(id) from t_user)",nativeQuery = true)
    User maxIdUser();
}

數據源二的 dao 如下:

package org.javaboy.jpa.dao2;
public interface UserDao2 extends JpaRepository<User,Integer> {
    List<User> getUserByAddressEqualsAndIdLessThanEqual(String address, Integer id);

    @Query(value = "select * from t_user where id=(select max(id) from t_user)",nativeQuery = true)
    User maxIdUser();
}

共同的實體類如下:

package org.javaboy.jpa.model;
@Entity(name = "t_user")
public class User {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Integer id;
    private String username;
    private String address;
    //省略getter/setter
}

到此,所有的配置就算完成了,接下來就可以在 Service 中注入不同的 UserDao,不同的 UserDao 操作不同的數據源。

其實整合 Jpa 多數據源也不算難,就是有幾個細節問題,這些細節問題解決,其實前面介紹的其他多數據源整個都差不多。

好了,本文就先介紹到這裏。

相關案例已經上傳到 GitHub,歡迎小夥伴們們下載:

掃碼關注松哥,公眾號後台回復 2TB,獲取松哥獨家 超2TB 免費 Java 學習乾貨

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※帶您來看台北網站建置台北網頁設計,各種案例分享

德國頒佈最新電動汽車補貼計畫 共投入12億歐元

據報導,自7月1日起,德國頒佈最新的電動車補貼計畫,截止目前已經有近2000位申請者,其中寶馬車主占多數。  
  為了促進電動車等環保車型的普及,德國為每位購買電動車的消費者提供4000歐元的補貼,插電式混合動力車的補貼為3000歐元。在計畫實施後,有1791位插電式混合動力車的車主申請了補貼,其中有581位購買了寶馬的車型。同時還有444位申請者購買了雷諾車型,大眾汽車買主為154位。   據統計,目前德國人汽車擁有量為4500萬輛,而其中僅有5萬輛是純電動或者是混合動力車輛。為改善這一情況,德國此次計畫共投入12億歐元,由政府和汽車製造商平攤,希望能夠在2019年6月底,即計畫截止期前售出40萬輛電動車。   文章來源:環球網

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

工信部:第287批申報目錄 純電動汽車型達424款

8月10日,工信部對申報《道路機動車輛生產企業及產品公告》(第287批)的車輛新產品進行公示。申報第287批公告的純電動車輛及底盤數量多達424款,插電式混合動力車輛及底盤數量為10款,純電動廂式運輸車車型數量多達129款。  
 
申報純電動客車車型數量約162款,詳情如下:   安凱9款,星凱龍4款,安達爾1款,北汽福田1款,比亞迪1款,長春北車電動汽車公司3款,長沙梅花汽車2款,成都客車2款,丹東黃海2款,東風汽車5款,東莞中汽宏遠2款,佛山飛馳2款,杭州長江客車2款,少林客車2款,南車時代5款,江蘇九龍1款,常隆客車1款,江蘇陸地方舟16款,江西江鈴4款,江西宜春客車廠1款,金華青年汽車12款,蘇州金龍8款,牡丹汽車2款,南京金龍8款,南京公共交通車輛廠1款,山西原野汽車6款,陝西躍迪1款,申龍客車1款,申沃客車1款,上海萬象1款,深圳五洲龍5款,川汽野馬1款,廈門金龍3款,廈門金旅10款,煙臺舒馳1款,揚子江汽車6款,揚州亞星3款,一汽客車1款,浙江南車電車8款,宇通6款,一汽集團1款,濟南豪沃1款,中通客車2款,中植一客5款,重慶恒通1款,珠海廣通1款。  
插電式混合動力客車及底盤共10款。具體到企業來看,安凱申報的插電式混合動力客車車型3款,昆明客車1款,揚州亞星2款,中通客車1款,重慶恒通2款。  
純電動轎車及乘用車的申報車型超過18款。其中,北汽3款,長城1款,東風悅達起亞1款,東風小康2款,合肥長安汽車1款,湖南江南汽車4款,奇瑞3款,榮成華泰1款,浙江吉利1款,力帆1款,比亞迪1款,江蘇卡威1款。   通過梳理第287批申報車型,筆者認為:純電動客車依然會是未來中國新能源汽車市場的主力軍,純電動物流車市場亟待爆發,純電動轎車和乘用車車型數量增多使得消費者的選擇權變大,此領域的競爭格局將會有所改變。第287批公告的公示時間為2016年8月10日至2016年8月16日。   文章來源:蓋世汽車

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※帶您來看台北網站建置台北網頁設計,各種案例分享

大規模搜索廣告的端到端一致性實時保障

一、背景

電商平台的搜索廣告數據處理鏈路通常較長,一般會經歷如下過程:

  • 廣告主在後台進行廣告投放;
  • 投放廣告品及關鍵詞數據寫入數據庫;
  • 數據庫中的數據通過全量構建(導入數據倉庫再進行離線批處理)或增量構建(藉助消息隊列和流計算引擎)的方式產出用於構建在線索引的“內容文件”;
  • BuildService基於“內容文件”,構建出在搜索服務檢索時使用的索引。

下圖是ICBU的廣告系統的買賣家數據處理鏈路:

右半部分(BP->DB)和offline部分即為廣告投放數據的更新過程。

複雜的數據處理鏈路結合海量(通常是億級以上)的商品數據,對線上全量商品的投放狀態正確性測試提出巨大挑戰。從數據庫、到離線大規模數據聯表處理、到在線索引構建,鏈路中的任一節點出現異常或數據延遲,都有可能會對廣告主以及平台造成“資損”影響,例如:

  • 廣告主在後台操作取消A商品的廣告投放,但是因為數據鏈路處理延遲,搜索引擎中A的狀態仍處於“推廣中”,導致A能繼續在買家搜索廣告時得到曝光,相應地當“點擊”行為發生時,造成錯誤扣款。
  • 廣告主設定某個產品只限定對某個地域/國家的客戶投放廣告,但是因為搜索引擎的過濾邏輯處理不恰當,導致客戶的廣告品在所有地區都進行廣告投放,同樣會造成錯誤點擊扣款。

傳統的測試手段,或聚焦於廣告主後台應用的模塊功能測試,或聚焦於搜索引擎的模塊功能測試,對於全鏈路功能的測試缺乏有效和全面的測試手段。而線上的業務監控,則側重於對業務效果指標的監控,如CTR(click through rate,點擊率)、CPC(cost per click,點擊成本)、RPM(revenue per impression,千次瀏覽收益)等。對涉及廣告主切身利益和平台總營收的廣告錯誤投放問題,缺乏有效的發現機制。

我們期望對在線搜索廣告引擎所有實際曝光的商品,通過反查數據庫中曝光時刻前它的最後狀態,來校驗它在數據庫中的投放狀態與搜索引擎中的狀態的一致性,做到線上廣告錯誤投放問題的實時發現。同時,通過不同的觸發檢測方式,做到數據變更的各個環節的有效覆蓋。

二、階段成果

我們藉助日誌流同步服務(TTLog)、海量數據NoSQL存儲系統(Lindorm)、實時業務校驗平台(BCP)、消息隊列(MetaQ)、在線數據實時同步服務(精衛)以及海量日誌實時分析系統(Xflush)實現了ICBU搜索廣告錯誤投放問題的線上實時發現,且覆蓋線上的全部用戶真實曝光流量。同時,通過在數據變更節點增加主動校驗的方式,可以做到在特定場景下(該廣告品尚未被用戶檢索)的線上問題先於用戶發現。

此外,藉助TTLog+實時計算引擎Blink+阿里雲日誌服務SLS+Xflush的技術體系,實現了線上引擎/算法效果的實時透出。

下面是ICBU廣告實時質量大盤:

從八月底開始投入線上使用,目前這套實時系統已經發現了多起線上問題,且幾乎都是直接影響資損和廣告主的利益。

三、技術實現

圖一:

1. 引擎曝光日誌數據處理

對於電商搜索廣告系統,當一個真實的用戶請求觸達(如圖一中1.1)時,會產生一次實時的廣告曝光,相對應地,搜索引擎的日誌里會寫入一條曝光記錄(如圖一中2)。我們通過日誌流同步服務TTLog對搜索引擎各個服務器節點上的日誌數據進行統一的搜集(如圖一中3),然後藉助數據對賬服務平台BCP對接TTLog中的“流式”數據(如圖一中4),對數據進行清洗、過濾、採樣,然後將待校驗的數據推送到消息隊列服務MetaQ(如圖一中5)。

2. DB數據處理

圖二:

如圖二所示,通常,業務數據庫MySQL針對每個領域對象,只會存儲它當前時刻最新的數據。為了獲取廣告品在引擎中真實曝光的時刻前的最後數據,我們通過精衛監聽數據庫中的每次數據變更,將變更數據“快照”寫入Lindorm(底層是HBase存儲,支持海量數據的隨機讀寫)。

3. 數據一致性校驗

在廣告測試服務igps(我們自己的應用)中,我們通過監聽MetaQ的消息變更,拉取MetaQ中待校驗的數據(如圖一中6),解析獲得曝光時每個廣告品在搜索引擎中的狀態,同時獲得其曝光的時刻點。然後基於曝光時刻點,通過查詢Lindorm,獲得廣告品於曝光時刻點前最後在MySQL中的數據狀態(如圖一中7)。然後igps對該次廣告曝光,校驗引擎中的數據狀態和MySQL中的數據狀態的一致性。

如果數據校驗不一致,則打印出錯誤日誌。最後,藉助海量日誌實時分析系統Xflush(如圖一中8),我們可以做到對錯誤數據的實時聚合統計、可視化展示以及監控報警。

4. 數據變更節點的主動校驗

因為線上的實時用戶搜索流量具有一定的隨機性,流量場景的覆蓋程度具有很大的不確定性,作為補充,我們在數據變更節點還增加了主動校驗。

整個數據鏈路,數據變更有兩個重要節點:

  • MySQL中的數據變更;
  • 引擎索引的切換。

對於MySQL中的數據變更:我們通過精衛監聽變更,針對單條數據的變更信息,構建出特定的引擎查詢請求串,發起查詢請求(如圖一中1.3)。

對於引擎索引的切換(主要是全量切換):我們通過離線對歷史(如過去7天)的線上廣告流量進行聚合分析/改寫,得到測試用例請求集合。再監聽線上引擎索引的切換操作。當引擎索引進行全量切換時,我們主動發起對引擎服務的批量請求(如圖一中1.2)。

上述兩種主動發起的請求,最後都會復用前面搭建的數據一致性校驗系統進行廣告投放狀態的校驗。

上圖是對廣告投放狀態的實時校驗錯誤監控圖,從圖中我們清晰看到當前時刻,搜索廣告鏈路的數據質量。無論是中美業務DB同步延遲、DB到引擎數據增量處理鏈路的延遲、或者是發布變更導致的邏輯出錯,都會導致錯誤數據曲線的異常上漲。校驗的規則覆蓋了推廣計劃(campaign)、推廣組(adgroup)、客戶狀態(customer)、詞的狀態(keyword)、品的狀態(feed)。校驗的節點覆蓋了曝光和點擊兩個不同的環節。

5. 引擎及算法的實時質量

圖三:

搜索引擎日誌pvlog中蘊含了非常多有價值的信息,利用好這些信息不僅可以做到線上問題的實時發現,還能幫助算法同學感知線上的實時效果提供抓手。如圖三所示,通過實時計算引擎Blink我們對TTLog中的pv信息進行解析和切分,然後將拆分的結果輸出到阿里雲日誌服務SLS中,再對接Xflush進行實時的聚合和可視化展示。

如上圖所示,上半年我們曾出現過一次線上的資損故障,是搜索應用端構造的搜索廣告引擎SP請求串中缺失了一個參數,導致部分頭部客戶的廣告沒有在指定地域投放,故障從發生到超過10+客戶上報才發現,歷經了10幾個小時。我們通過對SP請求串的實時key值和重要value值進行實時監控,可以快速發現key值或value值缺失的場景。

此外,不同召回類型、扣費類型、以及扣費價格的分佈,不僅可以監控線上異常狀態的出現,還可以給算法同學做實驗、調參、以及排查線上問題時提供參考。

四、幾個核心問題

1. why lindorm?

最初的實現,我們是通過精衛監聽業務DB的變更寫入另一個新的DB(MySQL),但是性能是一個非常大的瓶頸。我們的數據庫分了5+個物理庫,1000+張分表,單表的平均數據量達到1000+w行,總數據達到千億行。

后通過存儲的優化和按邏輯進行分表的方式,實現了查詢性能從平均1s到70ms的提升。

2. why BCP + MetaQ + igps?

最初我們是想直接使用BCP對數據進行校驗:通過igps封裝lindorm的查詢接口,然後提供hsf接口供在BCP里直接使用。

但是還是因為性能問題:TTLog的一條message平均包含60+條pv,每個pv可能有5個或更多廣告,每個廣告要查6張表,單條message在BCP校驗需要調用約60x5x6=1800次hsf請求。當我們在BCP中對TTLog的數據進行10%的採樣時,後端服務igps的性能已經出現瓶頸,hsf線程池被打滿,同時7台服務器的cpu平均使用率達到70%以上。

藉助MetaQ的引入,可以剔除hsf調用的網絡開銷,同時將消息的生產和消費解耦,當流量高峰到達時,igps可以保持自己的消費速率不變,更多的消息可以暫存在隊列里。通過這一優化,我們不僅扛住了10%的採樣,當線上採樣率開到100%時,我們的igps的服務器的平均cpu使用率仍只維持在20%上下,而且metaq中沒有出現消息堆積。

不過這樣一來,bcp的作用從原來的“採樣、過濾、校驗、報警”,只剩下“採樣、過濾”。無法發揮其通過在線編碼可以快速適應業務變化的作用。

3. why not all blink?

其實“BCP + MetaQ + igps”的流程可以被“Blink + SLS”取代,那為什麼不都統一使用Blink呢。

一方面,目前點擊的校驗由於其流量相對較小的因素,我們目前是直接在BCP里編寫的校驗代碼,不需要走發布流程,比較快捷。而且BCP擁有如“延遲校驗”、“限流控制”等個性化的功能。另一方面,從我們目前使用Blink的體驗來看,實時的處理引擎尚有一些不穩定的因素,尤其是會有不穩定的網絡抖動(可能是數據源和Blink workder跨機房導致)。

4. SP請求的key值如何拆分?

在做SP請求串的實時key值監控的時候,遇到了一個小難題:SP的請求串中參數key是動態的,並不是每個key都會在每個串中出現,而且不同的請求串key出現的順序是不一樣的。如何切分使其滿足Xflush的“列值分組”格式要求。

實現方式是,對每個sp請求串,使用Blink的udtf(自定義表值函數)進行解析,得到每個串的所有key和其對應的value。然後輸出時,按照“validKey={key},validValue={value}”的格式對每個sp請求串拆分成多行輸出。然後通過Xflush可以按照validKey進行分組,並對行數進行統計。

五、總結及後續規劃

本文介紹了通過大數據的處理技術做到電商搜索廣告場景下數據端到端一致性問題的實時發現,並且通過“實時發現”結合“數據變更節點的主動校驗”,實現數據全流程的一致性校驗。

後續的優化方向主要有兩方面:

  • 結合業務的使用場景,透出更豐富維度的實時數據。
  • 將該套技術體系“左移”到線下/預發測試階段,實現“功能、性能、效果”的一鍵式自動化測試,同時覆蓋從搜索應用到引擎的全鏈路。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

安卓JNI精細化講解,讓你徹底了解JNI(一):環境搭建與HelloWord

目錄

1、基礎概念

1.1、JNI

JNI(Java Native Interface)Java本地接口,使得Java與C/C++具有交互能力

1.2、NDK

NDK(Native Development Kit) 本地開發工具包,允許使用原生語言(C和C++)來實現應用程序的部分功能

Android NDK開發的主要作用:

1、特定場景下,提升應用性能;
2、代碼保護,增加反編譯難度;
3、生成庫文件,庫可重複使用,也便於平台、項目間移植;

1.3、CMake與ndk-build

當我們基於NDK開發出native功能后,通常需要編譯成庫文件,給Android項目使用。
目前,有兩種主流的編譯方式:__CMake__與ndk-build

__CMake__與__ndk-build__是兩種不同的編譯工具(與Android代碼和C/C++代碼無關)

CMake

CMake是Androidstudio2.2之後引入的跨平台編譯工具(特點:簡單易用,2.2之後是默認的NDK編譯工具)

如何配置:
   1、創建CMakeLists.txt文件,配置CMake必要參數;
   2、使用gradle配置CMakeLists.txt以及native相關參數;

如何編譯庫文件:
   1、Android Studio執行Build即可;

ndk-build

ndk-build是NDK中包含的腳本工具(可在NDK目錄下找到該工具,為了方便使用,通常配置NDK的環境變量)

如何配置:
   1、創建Android.mk文件,配置ndk-build必要參數;
   2、可選創建application.mk文件,配置ndk-build參數 (該文件的配置項可使用gradle的配置替代);
   3、使用gradle配置Android.mk以及native相關參數;

2、如何編譯庫文件(兩種方式):
   1、Android Studio執行Build即可(執行了:Android.mk + gradle配置);
   2、也可在Terminal、Mac終端、cmd終端中通過ndk-build命令直接構建庫文件(執行了:Android.mk)

2、環境搭建

JNI安裝
JNI 是JDK里的內容,電腦上正確安裝並配置JDK即可 (JDK1.1之後就正式支持了);

NDK安裝
可從官網自行下載、解壓到本地,也可基於AndroidStudio下載解壓到默認目錄;

編譯工具安裝
cmake 可基於AndroidStudio下載安裝;
ndk-build 是NDK里的腳本工具,NDK安裝好即可使用ndk-build;

當前演示,使用的Android Studio版本如下(當前最新版):

啟動Android Studio –> 打開SDK Manager –> SDK Tools,如下圖所示:

我們選擇NDK、CMake、LLDB(調試Native時才會使用),選擇Apply進行安裝,等安裝成功后,NDK開發所依賴的環境也就都齊全了。

3、Native C++ 項目(HelloWord案例)

3.1、項目創建(java / kotlin)

新建項目,選擇 Native C++,如下圖:

新創建的項目,默認已包含完整的native 示例代碼、cmake配置 ,如下圖:

這樣,我們就可以自己定義Java native方法,並在cpp目錄中寫native實現了,很方便。

但是,當我們寫完native的實現代碼,希望運行APP,查看JNI的交互效果,此時,就需要使用編譯工具了,咱們還是先看一下Android Studio默認的Native編譯方式吧:CMake

3.2、CMake的應用

在CMake編譯之前,咱們應該先做哪些準備工作?

1、NDK環境是否配置正確?
-- 如果未配置正確是無法進行C/C++開發的,更不用說CMake編譯了

2、C/C++功能是否實現? 
-- 此次演示主要使用系統默認創建的native-lib.cpp文件,關於具體如何實現:後續文章再詳細講解

3、CMakeLists.txt是否創建並正確配置? 
-- 該文件是CMake工具編譯的基礎,未配置或配置項錯誤,均會影響編譯結果

4、gradle是否正確配置?
-- gradle配置也是CMake工具編譯的基礎,未配置或配置項錯誤,均會影響編譯結果

除此之外,咱們還應該學習CMake的哪些重要知識?

1、CMake工具編譯生成的庫文件默認在什麼位置?apk中庫文件又是在什麼位置?
2、CMake工具如何指定編譯生成的庫文件位置?
3、CMake工具如何指定生成不同CPU平台對應的庫文件?

帶着這些問題,咱們開始CMake之旅吧:

3.2.1、NDK環境檢查

編譯前,建議先檢查下工程的NDK配置情況(不然容易報一些亂七八糟的錯誤):
File –> Project Structure –> SDK Location,如下圖(我本地的Android Studio默認沒有給配置NDK路徑,那麼,需要自己手動指定一下):

3.2.2、C/C++功能實現

因為本節主講CMake編譯工具,代碼就不單獨寫了,咱們直接使用工程默認生成的native-liv.cpp,簡單調整一下native實現方法的代碼吧(修改返迴文本信息):

因Native C++工程默認已配置好了CMakeLists.txt和gradle,所以咱們可直接運行工程看效果,如下圖:

JNI交互效果我們已經看到了,說明CMake編譯成功了。那麼,這究竟是怎麼做到的呢?咱們接着分析一下吧:

3.2.3、CMake生成的庫文件與apk中的庫文件

安卓工程編譯時,會執行CMake編譯,在 工程/app/build/…/cmake/ 中會產生對應的so文件,如下圖:

繼續編譯安卓工程,會根據build中的內容,生成我們的*.apk安裝包文件。我們找到、反編譯apk安裝包文件,查找so庫文件。原來在apk安裝包中,so庫都被存放在lib目錄中,如下圖:

3.2.4、CMake是如何編譯生成so庫的呢?

在前面介紹CMake定義時,提到了CMake是基於CMakeLists.txt文件和gradle配置實現編譯Native類的。那麼,咱們先來看一下CMakeLists.txt文件吧:

#cmake最低版本要求
cmake_minimum_required(VERSION 3.4.1)

#添加庫
add_library(
        # 庫名
        native-lib

        # 類型:
        # SHARED 是指動態庫,對應的是.so文件
        # STATIC 是指靜態庫,對應的是.a文件
        # 其他類型:略
        SHARED

        # native類路徑
        native-lib.cpp)

# 查找依賴庫
find_library( 
        # 依賴庫別名
        log-lib

        # 希望加到本地的NDK庫名稱,log指NDK的日誌庫
        log)


# 鏈接庫,建立關係( 此處就是指把log-lib 鏈接給 native-lib使用 )
target_link_libraries( 
        # 目標庫名稱(native-lib 是咱們要生成的so庫)
        native-lib

        # 要鏈接的庫(log-lib 是上面查找的log庫)
        ${log-lib})

實際上,CMakeList.txt可配置的內容遠不止這些,如:so輸出目錄,生成規則等等,有需要的同學可查下官網。

接着,咱們再看一下app的gradle又是如何配置CMake的呢?

apply plugin: 'com.android.application'

android {
    compileSdkVersion 29
    buildToolsVersion "29.0.1"
    defaultConfig {
        applicationId "com.qxc.testnativec"
        minSdkVersion 21
        targetSdkVersion 29
        versionCode 1
        versionName "1.0"
        testInstrumentationRunner "androidx.test.runner.AndroidJUnitRunner"
        //定義cmake默認配置屬性
        externalNativeBuild {
            cmake {
                cppFlags ""
            }
        }
    }
    
    //定義cmake對應的CMakeList.txt路徑(重要)
    externalNativeBuild {
        cmake {
            path "src/main/cpp/CMakeLists.txt"
        }
    }
}

dependencies {
    implementation fileTree(dir: 'libs', include: ['*.jar'])
    implementation 'androidx.appcompat:appcompat:1.1.0'
    implementation 'androidx.constraintlayout:constraintlayout:1.1.3'
    testImplementation 'junit:junit:4.12'
    androidTestImplementation 'androidx.test.ext:junit:1.1.1'
    androidTestImplementation 'androidx.test.espresso:espresso-core:3.2.0'
}

實際上,gradle可配置的cmake內容也遠不止這些,如:abi、cppFlags、arguments等,有需要的同學可查下官網。

3.2.5、如何指定庫文件的輸出目錄?

如果希望將so庫生成到特定目錄,並讓項目直接使用該目錄下的so,應該如何配置呢?
比較簡單:需要在CMakeList.txt中配置庫的輸出路徑信息,即:

CMAKE_LIBRARY_OUTPUT_DIRECTORY

# cmake最低版本要求
cmake_minimum_required(VERSION 3.4.1)

# 配置庫生成路徑
# CMAKE_CURRENT_SOURCE_DIR是指 cmake庫的源路徑,通常是build/.../cmake/
# /../jniLibs/是指與CMakeList.txt所在目錄的同級目錄:jniLibs (如果沒有會新建)
# ANDROID_ABI 生成庫文件時,採用gradle配置的ABI策略(即:生成哪些平台對應的庫文件)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../jniLibs/${ANDROID_ABI})

# 添加庫
add_library( # 庫名
        native-lib

        # 類型:
        # SHARED 是指動態庫,對應的是.so文件
        # STATIC 是指靜態庫,對應的是.a文件
        # 其他類型:略
        SHARED

        # native類路徑
        native-lib.cpp)

# 查找依賴庫
find_library(
        # 依賴庫別名
        log-lib

        # 希望加到本地的NDK庫名稱,log指NDK的日誌庫
        log)


# 鏈接庫,建立關係( 此處就是指把log-lib 鏈接給native-lib使用 )
target_link_libraries(
        # 目標庫名稱(native-lib就是咱們要生成的so庫)
        native-lib

        # 要鏈接的庫(上面查找的log庫)
        ${log-lib})

還需要在gradle中配置 jniLibs.srcDirs 屬性(即:指定了lib庫目錄):

sourceSets {
        main {
            jniLibs.srcDirs = ['jniLibs']//指定lib庫目錄
        }
    }

接着,重新build就會在cpp相同目錄級別位置生成jniLibs目錄,so庫也在其中了:

注意事項:
1、配置了CMAKE_CURRENT_SOURCE_DIR,並非表示編譯時直接將so生成在該目錄中,實際編譯時,so文件仍然是
先生成在build/.../cmake/中,然後再拷貝到目標目錄中的

2、如果只配置了CMAKE_CURRENT_SOURCE_DIR,並未在gradle中配置 jniLibs.srcDirs,也會有問題,如下:
More than one file was found with OS independent path 'lib/arm64-v8a/libnative-lib.so'

此問題是指:在編譯生成apk時,發現了多個so目錄,android studio不知道使用哪一個了,所以需要咱們
告訴android studio當前工程使用的是jniLibs目錄,而非build/.../cmake/目錄
3.2.5、如何生成指定CPU平台對應的庫文件呢?

我們可以在cmake中設置abiFilters,也可在ndk中設置abiFilters,效果是一樣的:

defaultConfig {
        applicationId "com.qxc.testnativec"
        minSdkVersion 21
        targetSdkVersion 29
        versionCode 1
        versionName "1.0"
        testInstrumentationRunner "androidx.test.runner.AndroidJUnitRunner"
        externalNativeBuild {
            cmake {
                cppFlags ""
                abiFilters "arm64-v8a"
            }
        }
    }
defaultConfig {
        applicationId "com.qxc.testnativec"
        minSdkVersion 21
        targetSdkVersion 29
        versionCode 1
        versionName "1.0"
        testInstrumentationRunner "androidx.test.runner.AndroidJUnitRunner"
        externalNativeBuild {
            cmake {
                cppFlags ""
            }
        }
        ndk {
            abiFilters "arm64-v8a"
        }
    }

按照新的配置,我們重新運行工程,如下圖:

再反編譯看下工程,果然只有arm64-v8a的so庫了,不過庫文件在apk中仍然是放在lib目錄,而非jniLibs(其實也很好理解,jniLibs只是我們本地的目錄,便於我們管理庫文件,真正生成apk時,仍然會按照lib目錄放置庫文件),如下圖:

至此,CMake的主要技術點都講完了,接下來咱們看下NDK-Build吧~

3.3、ndk-build的應用

在ndk-build編譯之前,咱們又應該先做哪些準備工作?

1、ndk-build環境變量是否正確配置?
-- 如果未配置,是無法在cmd、Mac終端、Terminal中使用ndk-build命令的(會報錯:找不到命令)

2、NDK環境是否配置正確?
-- 如果未配置正確是無法進行C/C++開發的,更不用說ndk-build編譯了

3、C/C++功能是否實現?
-- 此次演示主要使用系統默認創建的native-lib.cpp文件,關於具體如何實現:後續文章再詳細講解
-- 注意:為了與CMake區分,咱們新建一個“jni”目錄存放C/C++文件、配置文件吧

4、Android.mk是否創建並正確配置? 
-- 該文件是ndk-build工具編譯的基礎,未配置或配置項錯誤,均會影響編譯結果

5、gradle是否正確配置?(可選項,如果通過cmd、Mac終端、Terminal執行ndk-build,可忽略)
-- gradle配置也是ndk-build工具編譯的基礎,未配置或配置項錯誤,均會影響編譯結果

6、Application.mk是否創建並正確配置?(可選項,一般配ABI、版本,這些項均可在gradle中配置)
-- 該文件也是ndk-build工具編譯的基礎,未配置或配置項錯誤,均會影響編譯結果

除此之外,咱們還應該學習ndk-build的哪些重要知識?

1、ndk-build工具如何指定編譯生成的庫文件位置?
2、ndk-build工具如何指定生成不同CPU平台對應的庫文件?

帶着這些問題,咱們繼續ndk-build之旅吧:

3.3.1、環境變量配置

介紹NDK-Build定義時,提到了其實它是NDK的腳本工具。那麼,咱們還是先進NDK目錄找一下吧,ndk-build工具的位置如下圖:

如果我們希望任意情況下都能便捷的使用這種腳本工具,通常做法是配置其環境變量,否則我們在cmd、Mac終端、Terminal中執行 ndk-build 命令時,會報錯:“未找到命令”

配置NDK的環境變量,也很簡單,以Mac電腦舉例(如果是Windows電腦,網上也有很多關於配置環境變量的文章,如果有需要可自行查下):

1、打開命令終端,輸入命令: open -e .bash_profile,打開bash_profile配置文件

2、寫入如下內容(NDK_HOME指向 ndk-build 所在路徑):
export NDK_HOME=/Users/xc/SDK/android-sdk-macosx/ndk/20.1.5948944
export PATH=$PATH:$NDK_HOME

3、生效.bash_profile配置
source .bash_profile

當我們在cmd、Mac終端、Terminal中執行 ndk-build 命令時,如果出現下圖所示內容,則代表配置成功了:

3.3.2、C/C++功能實現

咱們使用比較常用的一種ndk-build方式吧:ndk-build + Android.mk + gradle配置

項目中新建jni目錄,拷貝一份CMake的代碼實現吧:

1、新建jni目錄
2、拷貝cpp/native-lib.cpp 至 jni目錄下
3、重命名為haha.cpp (與CMake區分)
4、調整一下native實現方法的文本(與CMake運行效果區分)
5、新建Android.mk文件

接着,編寫Android.mk文件內容:

#表示Android.mk所在目錄
LOCAL_PATH := $(call my-dir)

#CLEAR_VARS變量指向特殊 GNU Makefile,用於清除部分LOCAL_變量
include $(CLEAR_VARS)

#模塊名稱
LOCAL_MODULE    := haha
#構建系統用於生成模塊的源文件列表
LOCAL_SRC_FILES := haha.cpp

#BUILD_SHARED_LIBRARY 表示.so動態庫
#BUILD_STATIC_LIBRARY 表示.a靜態庫
include $(BUILD_SHARED_LIBRARY)

配置gradle:

apply plugin: 'com.android.application'
android {
    compileSdkVersion 28
    defaultConfig {
        applicationId "com.aaa.testnative"
        minSdkVersion 16
        targetSdkVersion 28
        versionCode 1
        versionName "1.0"
        testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"

        //定義ndkBuild默認配置屬性
        externalNativeBuild {
            ndkBuild {
                cppFlags ""
            }
        }
    }
   
    //定義ndkBuild對應的Android.mk路徑(重要)
    externalNativeBuild {
        ndkBuild{
            path "src/main/jni/Android.mk"
        }
    }
}

dependencies {
    implementation fileTree(dir: 'libs', include: ['*.jar'])
    implementation 'com.android.support:appcompat-v7:28.0.0'
    implementation 'com.android.support.constraint:constraint-layout:1.1.3'
    testImplementation 'junit:junit:4.12'
    androidTestImplementation 'com.android.support.test:runner:1.0.2'
    androidTestImplementation 'com.android.support.test.espresso:espresso-core:3.0.2'
}

現在,native代碼、ndk-build配置都完成了,咱們運行看一下效果吧,如下圖:

3.3.4、如何指定庫文件的輸出目錄?

通常,可在Android.mk文件中配置NDK_APP_DST_DIR
指定源目錄與輸出目錄(與CMake類似)

#表示Android.mk所在目錄
LOCAL_PATH := $(call my-dir)

#設置庫文件的輸入目錄
#輸出目錄 ../jniLibs/
#源目錄 $(TARGET_ARCH_ABI)
NDK_APP_DST_DIR=../jniLibs/$(TARGET_ARCH_ABI)

#CLEAR_VARS變量指向特殊 GNU Makefile,用於清除部分LOCAL_變量
include $(CLEAR_VARS)

#模塊名稱
LOCAL_MODULE    := haha
#構建系統用於生成模塊的源文件列表
LOCAL_SRC_FILES := haha.cpp

#BUILD_SHARED_LIBRARY 表示.so動態庫
#BUILD_STATIC_LIBRARY 表示.a靜態庫
include $(BUILD_SHARED_LIBRARY)
3.3.5、如何生成指定CPU平台對應的庫文件呢?

可在gradle中配置abiFilters(與Cmake類似)

externalNativeBuild {
            ndkBuild {
                cppFlags ""
                abiFilters "arm64-v8a"
            }
        }
externalNativeBuild {
            ndkBuild {
                cppFlags ""
            }
        }
  ndk {
            abiFilters "arm64-v8a"
        }
3.3.6、如何在Terminal中直接通過ndk-build命令構建庫文件呢?

除了執行AndroidStudio的build命令,基於gradle配置 + Android.mk編譯生成庫文件,我們還可以在cmd、Mac 終端、Terminal中直接通過ndk-build命令構建庫文件,此處以Terminal為例進行演示吧:

先進入包含Android.mk文件的jni目錄(Android Studio中可直接選中jni目錄並拖拽到Terminal中,會自動跳轉到該目錄),再執行ndk-build命令,如下圖:

同樣,編譯也成功了,如下圖:

因是直接在Terminal中執行了ndk-build命令,所以只會根據Android.mk進行編譯(不包含gradle配置內容,也就不會執行abiFilters過濾),生成了所有默認CPU平台的so庫文件。

ndk-build命令其實也可以配上一些參數使用,此處就不再詳解了。日常開發時,還是建議選擇CMake作為Native編譯工具,因為是安卓主推的,而且更簡單一些。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※帶您來看台北網站建置台北網頁設計,各種案例分享